summaryrefslogtreecommitdiff
path: root/lib/presque/worker.pm
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-13 18:25:24 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-13 18:25:24 +0200
commit68a3e5a46d04f26af54f0763f482930850422af0 (patch)
tree51b38448158696bdec6102dfff6715dfd8fff262 /lib/presque/worker.pm
parentsome roles to handle dispatch (fork), logging, worker life (handle (diff)
downloadpresque-worker-68a3e5a46d04f26af54f0763f482930850422af0.tar.gz
fetch job from queue, handle job, handle failure, ...
Diffstat (limited to '')
-rw-r--r--lib/presque/worker.pm131
1 files changed, 95 insertions, 36 deletions
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 6e43c52..77115c1 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -1,48 +1,93 @@
package presque::worker;
-use Moose;
our $VERSION = '0.01';
-use AnyEvent;
-use AnyEvent::HTTP;
-
use Carp;
use JSON;
use Try::Tiny;
+use presque::worker::Queue;
-has base_uri => ( is => 'ro', isa => 'Str', required => 1 );
-has queue => ( is => 'ro', isa => 'Str', required => 1 );
-has interval => ( is => 'ro', isa => 'Int', lazy => 1, default => 5 );
+use Moose;
+with qw/
+ presque::worker::Role::Management
+ presque::worker::Role::Fork
+ presque::worker::Role::RESTClient
+ presque::worker::Role::Logger/;
+
+has queue_name => (is => 'ro', isa => 'Str', required => 1);
+has retries => (is => 'rw', isa => 'Int', default => 5);
+has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
+has _fail_method => (
+ is => 'rw',
+ isa => 'Bool',
+ lazy => 1,
+ default => 0,
+ predicate => '_has_fail_method'
+);
+has queue => (
+ is => 'ro',
+ isa => 'Object',
+ lazy => 1,
+ default =>
+ sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
+);
+has worker_id => (
+ is => 'ro',
+ isa => 'Str',
+ required => 1,
+ default => sub {
+ my $self = shift;
+ my $name = $self->meta->name . '_' . $$;
+ $name;
+ }
+);
-sub BUILD {
- my ( $self, $args ) = @_;
- my ( $get, $timer );
+before start => sub {
+ my $self = shift;
+ if (!$self->meta->find_method_by_name('work')) {
+ Carp::confess "method 'work' is missing";
+ }
+ if ($self->meta->find_method_by_name('fail')) {
+ $self->fail_method(1);
+ }
+};
+
+sub start {
+ my $self = shift;
+
+ $self->logger->log(
+ level => 'info',
+ message => "presque worker ["
+ . $self->worker_id
+ . "] : start to listen for "
+ . $self->queue_name
+ );
+
+ while (!$self->shut_down) {
+ my $job = $self->rest_fetch_job();
+ $self->work_once($job) if $job;
+ sleep($self->interval);
+ }
+ return $self;
+}
- my $uri = $self->base_uri;
- my $queue = $self->queue;
- my $queue_uri = $uri . '/q/' . $queue;
+sub work_once {
+ my ($self, $job) = @_;
- if ( !$self->meta->find_method_by_name('work') ) {
- Carp::confess "method work is missing";
+ try {
+ $self->work($job);
}
-
- $get = sub {
- http_get $queue_uri, sub {
- my ( $body, $hdr ) = @_;
- return if ( !$body || $hdr->{Status} != 200 );
- my $content = JSON::decode_json($body);
-
- try {
- $self->work($content);
- }
- catch {
- $self->fail($content, $_) if $self->meta->find_method_by_name('fail');
- };
- $timer = AnyEvent->timer( after => $self->interval, cb => $get );
- };
+ catch {
+ my $err = $_;
+ $self->logger->log(
+ level => 'error',
+ message => 'Job failed: ' . $err,
+ );
+ push @{$job->{fail}}, $err;
+ my $retries = ($job->{retries_left} || $self->retries) - 1;
+ $self->rest_retry_job($job) if $retries > 0;
+ $self->fail($job, $_) if $self->_has_fail_method;
};
- $get->();
- return $self;
}
1;
@@ -76,13 +121,27 @@ presque::worker - Worker for the C<presque> message queue system
=head2 work ($job_description)
-Worker must implement the B<work> method. The only argument of this method is a hashref
-containing the job.
+Worker must implement the B<work> method. The only argument of this method is a hashref containing the job.
=head2 fail ($job_description, $error_reason)
-Worker may implement the B<fail> method. This method have two arguments: the job description
-and the reason of the failure.
+Worker may implement the B<fail> method. This method have two arguments: the job description and the reason of the failure.
+
+=head1 ATTRIBUTES
+
+=head2 queue_name
+
+=head2 base_uri
+
+=head2 worker_id
+
+=head2 retries
+
+=head2 interval
+
+=head2
+
+The url of the presque webservices.
=head1 AUTHOR