summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-09 18:19:38 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-09 18:19:38 +0200
commit5b2042053577cc6381c40c4fb5d5264e79a0312d (patch)
tree643da7415c478ee5e444231690fe8cd91329b75c
parenta simple worker; a role for the REST interface to presque; reg signals (diff)
downloadpresque-worker-5b2042053577cc6381c40c4fb5d5264e79a0312d.tar.gz
add logger; move some code for work and job in roles;
-rw-r--r--eg/simple.pl3
-rw-r--r--lib/presque/worker.pm50
-rw-r--r--lib/presque/worker/Role/Dispatcher.pm44
-rw-r--r--lib/presque/worker/Role/Fork.pm11
-rw-r--r--lib/presque/worker/Role/Job.pm16
-rw-r--r--lib/presque/worker/Role/Logger.pm44
-rw-r--r--lib/presque/worker/Role/Management.pm32
-rw-r--r--lib/presque/worker/Role/RESTClient.pm1
-rw-r--r--t/10_basic.t13
9 files changed, 128 insertions, 86 deletions
diff --git a/eg/simple.pl b/eg/simple.pl
index ab07c51..6f921d6 100644
--- a/eg/simple.pl
+++ b/eg/simple.pl
@@ -9,9 +9,8 @@ with 'presque::worker';
use YAML::Syck;
sub work {
my ($self, $job) = @_;
- warn ">>>je suis $$\n";
warn Dump $job;
- sleep(100);
+ sleep(5);
}
package main;
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index ad8ebf0..264833c 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -11,12 +11,11 @@ requires 'work';
with qw/
presque::worker::Role::Management
- presque::worker::Role::Fork
+ presque::worker::Role::Dispatcher
presque::worker::Role::RESTClient
presque::worker::Role::Logger/;
has queue_name => (is => 'ro', isa => 'Str', required => 1);
-has retries => (is => 'rw', isa => 'Int', default => 5);
has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
has _fail_method => (
is => 'rw',
@@ -36,47 +35,11 @@ has worker_id => (
}
);
-before start => sub {
+after new => sub {
my $self = shift;
-
if ($self->meta->find_method_by_name('fail')) {
$self->fail_method(1);
}
-
- $self->logger->log(
- level => 'info',
- message => "presque worker ["
- . $self->worker_id
- . "] : start to listen for "
- . $self->queue_name
- );
-};
-
-around work => sub {
- my ($orig, $self, $job) = @_;
- $self->logger->log(
- level => 'debug',
- message => $self->worker_id . " start to work"
- );
-
- try {
- if ($self->fork_dispatcher) {
- my $fork = fork();
- if ($fork == 0) {
- $self->$orig($job);
- }elsif($fork > 0){
- return;
- }else{
- }
- }
- }catch{
- my $err = $_;
- $self->logger->log(
- level => 'error',
- message => 'Job failed: ' . $err,
- );
- $self->_job_failure($job, $err);
- };
};
sub start {
@@ -89,15 +52,6 @@ sub start {
}
}
-sub _job_failure {
- my ($self, $job, $err) = @_;
- push @{$job->{fail}}, $err;
- my $retries = ($job->{retries_left} || $self->retries) - 1;
- $job->{retries_left} = $retries;
- $self->rest_retry_job($job) if $retries > 0;
- $self->fail($job, $_) if $self->_has_fail_method;
-}
-
1;
__END__
diff --git a/lib/presque/worker/Role/Dispatcher.pm b/lib/presque/worker/Role/Dispatcher.pm
new file mode 100644
index 0000000..04ac8c3
--- /dev/null
+++ b/lib/presque/worker/Role/Dispatcher.pm
@@ -0,0 +1,44 @@
+package presque::worker::Role::Dispatcher;
+
+use Moose::Role;
+use Try::Tiny;
+
+has fork_dispatcher => (
+ is => 'ro',
+ isa => 'Bool',
+ default => 0,
+);
+
+around work => sub {
+ my ($orig, $self, $job) = @_;
+
+ try {
+ if ($self->fork_dispatcher) {
+ $self->_fork_and_work($orig, $job);
+ }
+ else {
+ $self->$orig($job);
+ }
+ }catch{
+ $self->_job_failure($job, $_);
+ };
+};
+
+
+sub _fork_and_work {
+ my ($self, $orig, $job) = @_;
+
+ my $pid = fork();
+ if ($pid == 0) {
+ $self->$orig($job);
+ exit;
+ }
+ elsif ($pid > 0) {
+ return;
+ }
+ else {
+ # failure
+ }
+}
+
+1;
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
deleted file mode 100644
index c62ff1b..0000000
--- a/lib/presque/worker/Role/Fork.pm
+++ /dev/null
@@ -1,11 +0,0 @@
-package presque::worker::Role::Fork;
-
-use Moose::Role;
-
-has fork_dispatcher => (
- is => 'ro',
- isa => 'Bool',
- default => 0,
-);
-
-1;
diff --git a/lib/presque/worker/Role/Job.pm b/lib/presque/worker/Role/Job.pm
new file mode 100644
index 0000000..6ce317c
--- /dev/null
+++ b/lib/presque/worker/Role/Job.pm
@@ -0,0 +1,16 @@
+package presque::worker::Role::Job;
+
+use Moose::Role;
+has job_retries => (is => 'rw', isa => 'Int', default => 5);
+
+sub _job_failure {
+ my ($self, $job, $err) = @_;
+
+ push @{$job->{fail}}, $err;
+ my $retries = ($job->{retries_left} || $self->job_retries) - 1;
+ $job->{retries_left} = $retries;
+ $self->rest_retry_job($job) if $retries > 0;
+ $self->fail($job, $_) if $self->_has_fail_method;
+}
+
+1;
diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm
index 3b6b317..4285b55 100644
--- a/lib/presque/worker/Role/Logger.pm
+++ b/lib/presque/worker/Role/Logger.pm
@@ -21,4 +21,48 @@ has logger => (
}
);
+before start => sub {
+ my $self = shift;
+
+ $self->logger->log(
+ level => 'info',
+ message => "presque worker ["
+ . $self->worker_id
+ . "] : start to listen for "
+ . $self->queue_name
+ );
+};
+
+before work => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'debug',
+ message => $self->worker_id . ' start to work',
+ );
+};
+
+before _shutdown => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' shuting down'
+ );
+};
+
+before _graceful_shutdown => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' kill child'
+ );
+};
+
+before _kill_child => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
+ );
+};
+
1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
index cdfcd6d..cceea4e 100644
--- a/lib/presque/worker/Role/Management.pm
+++ b/lib/presque/worker/Role/Management.pm
@@ -7,47 +7,31 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
before start => sub {
my $self = shift;
$self->rest_register_worker;
+ $SIG{INT} = sub { $self->_shutdown };
+ $SIG{TERM} = sub { $self->_shutdown };
+ $SIG{QUIT} = sub { $self->_graceful_shutdown };
+ $SIG{USR1} = sub { $self->_kill_child };
+ $SIG{CHLD} = 'IGNORE';
};
-after start => sub {
- my $self = shift;
- $self->rest_unregister_worker;
-};
-
-before start => sub {
- my $self = shift;
- $SIG{'INT'} = sub { $self->_shutdown };
- $SIG{'TERM'} = sub { $self->_shutdown };
- $SIG{'QUIT'} = sub { $self->_graceful_shutdown };
- $SIG{'USR1'} = sub { $self->_kill_child };
-};
+after start => sub { (shift)->rest_unregister_worker; };
+after _graceful_shutdown => sub { (shift)->rest_unregister_worker; };
+after _shutdown => sub { (shift)->rest_unregister_worker; };
sub _shutdown {
my $self = shift;
- $self->logger->log(
- level => 'info',
- message => 'worker ' . $self->worker_id . ' shuting down'
- );
$self->shut_down(1);
$self->_kill_child();
}
sub _graceful_shutdown {
my $self = shift;
- $self->logger->log(
- level => 'info',
- message => 'worker ' . $self->worker_id . ' kill child'
- );
$self->shut_down(1);
$self->_kill_child();
}
sub _kill_child {
my $self = shift;
- $self->logger->log(
- level => 'info',
- message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
- );
}
1;
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
index dd84fda..6961806 100644
--- a/lib/presque/worker/Role/RESTClient.pm
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -69,7 +69,6 @@ sub rest_retry_job {
$request->content(JSON::encode_json($job));
my $res = $self->ua->request($request);
if (!$res->is_success) {
- use YAML::Syck; warn Dump $res;
$self->logger->log(
level => 'error',
message => 'failed to update job ('
diff --git a/t/10_basic.t b/t/10_basic.t
new file mode 100644
index 0000000..0754f66
--- /dev/null
+++ b/t/10_basic.t
@@ -0,0 +1,13 @@
+use strict;
+use warnings;
+
+use Test::More;
+
+use presque::worker;
+
+my $w = presque::worker->new_with_traits( { traits => [qw/foo/] } );
+my $w2 = presque::worker->new();
+
+ok 1;
+
+done_testing; \ No newline at end of file