summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/presque/worker.pm131
-rw-r--r--lib/presque/worker/Queue.pm23
2 files changed, 118 insertions, 36 deletions
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 6e43c52..77115c1 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -1,48 +1,93 @@
package presque::worker;
-use Moose;
our $VERSION = '0.01';
-use AnyEvent;
-use AnyEvent::HTTP;
-
use Carp;
use JSON;
use Try::Tiny;
+use presque::worker::Queue;
-has base_uri => ( is => 'ro', isa => 'Str', required => 1 );
-has queue => ( is => 'ro', isa => 'Str', required => 1 );
-has interval => ( is => 'ro', isa => 'Int', lazy => 1, default => 5 );
+use Moose;
+with qw/
+ presque::worker::Role::Management
+ presque::worker::Role::Fork
+ presque::worker::Role::RESTClient
+ presque::worker::Role::Logger/;
+
+has queue_name => (is => 'ro', isa => 'Str', required => 1);
+has retries => (is => 'rw', isa => 'Int', default => 5);
+has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
+has _fail_method => (
+ is => 'rw',
+ isa => 'Bool',
+ lazy => 1,
+ default => 0,
+ predicate => '_has_fail_method'
+);
+has queue => (
+ is => 'ro',
+ isa => 'Object',
+ lazy => 1,
+ default =>
+ sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
+);
+has worker_id => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1,
+ default => sub {
+ my $self = shift;
+ my $name = $self->meta->name . '_' . $$;
+ $name;
+ }
+);
-sub BUILD {
- my ( $self, $args ) = @_;
- my ( $get, $timer );
+before start => sub {
+ my $self = shift;
+ if (!$self->meta->find_method_by_name('work')) {
+ Carp::confess "method 'work' is missing";
+ }
+ if ($self->meta->find_method_by_name('fail')) {
+ $self->fail_method(1);
+ }
+};
+
+sub start {
+ my $self = shift;
+
+ $self->logger->log(
+ level => 'info',
+ message => "presque worker ["
+ . $self->worker_id
+ . "] : start to listen for "
+ . $self->queue_name
+ );
+
+ while (!$self->shut_down) {
+ my $job = $self->rest_fetch_job();
+ $self->work_once($job) if $job;
+ sleep($self->interval);
+ }
+ return $self;
+}
- my $uri = $self->base_uri;
- my $queue = $self->queue;
- my $queue_uri = $uri . '/q/' . $queue;
+sub work_once {
+ my ($self, $job) = @_;
- if ( !$self->meta->find_method_by_name('work') ) {
- Carp::confess "method work is missing";
+ try {
+ $self->work($job);
}
-
- $get = sub {
- http_get $queue_uri, sub {
- my ( $body, $hdr ) = @_;
- return if ( !$body || $hdr->{Status} != 200 );
- my $content = JSON::decode_json($body);
-
- try {
- $self->work($content);
- }
- catch {
- $self->fail($content, $_) if $self->meta->find_method_by_name('fail');
- };
- $timer = AnyEvent->timer( after => $self->interval, cb => $get );
- };
+ catch {
+ my $err = $_;
+ $self->logger->log(
+ level => 'error',
+ message => 'Job failed: ' . $err,
+ );
+ push @{$job->{fail}}, $err;
+ my $retries = ($job->{retries_left} || $self->retries) - 1;
+ $self->rest_retry_job($job) if $retries > 0;
+ $self->fail($job, $_) if $self->_has_fail_method;
};
- $get->();
- return $self;
}
1;
@@ -76,13 +121,27 @@ presque::worker - Worker for the C<presque> message queue system
=head2 work ($job_description)
-Worker must implement the B<work> method. The only argument of this method is a hashref
-containing the job.
+Worker must implement the B<work> method. The only argument of this method is a hashref containing the job.
=head2 fail ($job_description, $error_reason)
-Worker may implement the B<fail> method. This method have two arguments: the job description
-and the reason of the failure.
+Worker may implement the B<fail> method. This method have two arguments: the job description and the reason of the failure.
+
+=head1 ATTRIBUTES
+
+=head2 queue_name
+
+=head2 base_uri
+
+=head2 worker_id
+
+=head2 retries
+
+=head2 interval
+
+=head2
+
+The url of the presque webservices.
=head1 AUTHOR
diff --git a/lib/presque/worker/Queue.pm b/lib/presque/worker/Queue.pm
new file mode 100644
index 0000000..9045941
--- /dev/null
+++ b/lib/presque/worker/Queue.pm
@@ -0,0 +1,23 @@
+package presque::worker::Queue;
+
+use Moose;
+
+has base_uri => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1
+);
+
+sub push {
+ my ( $self, ) = @_;
+}
+
+sub pull {
+ my ( $self, ) = @_;
+}
+
+sub delete {
+ my ( $self, ) = @_;
+}
+
+1;