summaryrefslogtreecommitdiff
path: root/lib/presque/worker.pm
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-15 14:28:42 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-15 14:28:42 +0200
commitde8d333b8806b4a1ea0a997f6a916d127eadae4b (patch)
treef3d3c0edba570fb937c9390ad19dfb489f05a210 /lib/presque/worker.pm
parentfetch job from queue, handle job, handle failure, ... (diff)
downloadpresque-worker-de8d333b8806b4a1ea0a997f6a916d127eadae4b.tar.gz
a simple worker; a role for the REST interface to presque; reg signals
to shutdown workers; log before starting a task; fork dispatcher (a la resque)
Diffstat (limited to '')
-rw-r--r--lib/presque/worker.pm78
1 files changed, 43 insertions, 35 deletions
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 77115c1..ad8ebf0 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -5,9 +5,10 @@ our $VERSION = '0.01';
use Carp;
use JSON;
use Try::Tiny;
-use presque::worker::Queue;
-use Moose;
+use Moose::Role;
+requires 'work';
+
with qw/
presque::worker::Role::Management
presque::worker::Role::Fork
@@ -16,7 +17,7 @@ with qw/
has queue_name => (is => 'ro', isa => 'Str', required => 1);
has retries => (is => 'rw', isa => 'Int', default => 5);
-has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
+has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
has _fail_method => (
is => 'rw',
isa => 'Bool',
@@ -24,13 +25,6 @@ has _fail_method => (
default => 0,
predicate => '_has_fail_method'
);
-has queue => (
- is => 'ro',
- isa => 'Object',
- lazy => 1,
- default =>
- sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
-);
has worker_id => (
is => 'ro',
isa => 'Str',
@@ -44,16 +38,10 @@ has worker_id => (
before start => sub {
my $self = shift;
- if (!$self->meta->find_method_by_name('work')) {
- Carp::confess "method 'work' is missing";
- }
+
if ($self->meta->find_method_by_name('fail')) {
$self->fail_method(1);
}
-};
-
-sub start {
- my $self = shift;
$self->logger->log(
level => 'info',
@@ -62,32 +50,52 @@ sub start {
. "] : start to listen for "
. $self->queue_name
);
+};
- while (!$self->shut_down) {
- my $job = $self->rest_fetch_job();
- $self->work_once($job) if $job;
- sleep($self->interval);
- }
- return $self;
-}
-
-sub work_once {
- my ($self, $job) = @_;
+around work => sub {
+ my ($orig, $self, $job) = @_;
+ $self->logger->log(
+ level => 'debug',
+ message => $self->worker_id . " start to work"
+ );
try {
- $self->work($job);
- }
- catch {
+ if ($self->fork_dispatcher) {
+ my $fork = fork();
+ if ($fork == 0) {
+ $self->$orig($job);
+ }elsif($fork > 0){
+ return;
+ }else{
+ }
+ }
+ }catch{
my $err = $_;
$self->logger->log(
level => 'error',
message => 'Job failed: ' . $err,
);
- push @{$job->{fail}}, $err;
- my $retries = ($job->{retries_left} || $self->retries) - 1;
- $self->rest_retry_job($job) if $retries > 0;
- $self->fail($job, $_) if $self->_has_fail_method;
+ $self->_job_failure($job, $err);
};
+};
+
+sub start {
+ my $self = shift;
+
+ while (!$self->shut_down) {
+ my $job = $self->rest_fetch_job();
+ $self->work($job) if $job;
+ sleep($self->interval);
+ }
+}
+
+sub _job_failure {
+ my ($self, $job, $err) = @_;
+ push @{$job->{fail}}, $err;
+ my $retries = ($job->{retries_left} || $self->retries) - 1;
+ $job->{retries_left} = $retries;
+ $self->rest_retry_job($job) if $retries > 0;
+ $self->fail($job, $_) if $self->_has_fail_method;
}
1;
@@ -101,7 +109,7 @@ presque::worker - a presque worker
package myworker;
use Moose;
- extends 'presque::worker';
+ with 'presque::worker';
sub work {
my ($self, $job) = @_;