summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/worker.pm30
1 files changed, 27 insertions, 3 deletions
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 5bf7a51..d3e9a0e 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -7,13 +7,15 @@ use JSON;
use Try::Tiny;
use Moose::Role;
+use Net::Presque;
+
requires 'work';
with qw/
presque::worker::Role::Management
presque::worker::Role::Dispatcher
- presque::worker::Role::RESTClient
presque::worker::Role::Job
+ presque::worker::Role::Context
presque::worker::Role::Logger/;
has queue_name => (is => 'ro', isa => 'Str', required => 1);
@@ -35,6 +37,23 @@ has worker_id => (
$name;
}
);
+has rest_client => (
+ is => 'rw',
+ isa => 'Net::Presque',
+ lazy => 1,
+ default => sub {
+ my $self = shift;
+ my $client =
+ Net::Presque->new(api_base_url => $self->context->{rest}->{url});
+ $client;
+ },
+ handles => {
+ pull => 'fetch_job',
+ retry_job => 'failed_job',
+ register_worker => 'register_worker',
+ unregister_worker => 'unregister_worker'
+ }
+);
after new => sub {
my $self = shift;
@@ -47,9 +66,10 @@ sub start {
my $self = shift;
while (!$self->shut_down) {
- my $job = $self->rest_fetch_job();
+ my $job = try {
+ $self->pull(queue_name => '', worker_id => $self->worker_id);
+ };
$job ? $self->work($job) : $self->idle();
-
}
}
@@ -95,6 +115,10 @@ Worker must implement the B<work> method. The only argument of this method is a
Worker may implement the B<fail> method. This method have two arguments: the job description and the reason of the failure.
+=head2 idle
+
+If no job, the worker execute the method B<idle>. By default, this method will sleep a number of seconds defined in the B<interval> attribute.
+
=head1 ATTRIBUTES
=head2 queue_name