summaryrefslogtreecommitdiff
path: root/lib/presque/worker
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-09 18:19:38 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-09 18:19:38 +0200
commit5b2042053577cc6381c40c4fb5d5264e79a0312d (patch)
tree643da7415c478ee5e444231690fe8cd91329b75c /lib/presque/worker
parenta simple worker; a role for the REST interface to presque; reg signals (diff)
downloadpresque-worker-5b2042053577cc6381c40c4fb5d5264e79a0312d.tar.gz
add logger; move some code for work and job in roles;
Diffstat (limited to 'lib/presque/worker')
-rw-r--r--lib/presque/worker/Role/Dispatcher.pm44
-rw-r--r--lib/presque/worker/Role/Fork.pm11
-rw-r--r--lib/presque/worker/Role/Job.pm16
-rw-r--r--lib/presque/worker/Role/Logger.pm44
-rw-r--r--lib/presque/worker/Role/Management.pm32
-rw-r--r--lib/presque/worker/Role/RESTClient.pm1
6 files changed, 112 insertions, 36 deletions
diff --git a/lib/presque/worker/Role/Dispatcher.pm b/lib/presque/worker/Role/Dispatcher.pm
new file mode 100644
index 0000000..04ac8c3
--- /dev/null
+++ b/lib/presque/worker/Role/Dispatcher.pm
@@ -0,0 +1,44 @@
+package presque::worker::Role::Dispatcher;
+
+use Moose::Role;
+use Try::Tiny;
+
+has fork_dispatcher => (
+ is => 'ro',
+ isa => 'Bool',
+ default => 0,
+);
+
+around work => sub {
+ my ($orig, $self, $job) = @_;
+
+ try {
+ if ($self->fork_dispatcher) {
+ $self->_fork_and_work($orig, $job);
+ }
+ else {
+ $self->$orig($job);
+ }
+ }catch{
+ $self->_job_failure($job, $_);
+ };
+};
+
+
+sub _fork_and_work {
+ my ($self, $orig, $job) = @_;
+
+ my $pid = fork();
+ if ($pid == 0) {
+ $self->$orig($job);
+ exit;
+ }
+ elsif ($pid > 0) {
+ return;
+ }
+ else {
+ # failure
+ }
+}
+
+1;
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
deleted file mode 100644
index c62ff1b..0000000
--- a/lib/presque/worker/Role/Fork.pm
+++ /dev/null
@@ -1,11 +0,0 @@
-package presque::worker::Role::Fork;
-
-use Moose::Role;
-
-has fork_dispatcher => (
- is => 'ro',
- isa => 'Bool',
- default => 0,
-);
-
-1;
diff --git a/lib/presque/worker/Role/Job.pm b/lib/presque/worker/Role/Job.pm
new file mode 100644
index 0000000..6ce317c
--- /dev/null
+++ b/lib/presque/worker/Role/Job.pm
@@ -0,0 +1,16 @@
+package presque::worker::Role::Job;
+
+use Moose::Role;
+has job_retries => (is => 'rw', isa => 'Int', default => 5);
+
+sub _job_failure {
+ my ($self, $job, $err) = @_;
+
+ push @{$job->{fail}}, $err;
+ my $retries = ($job->{retries_left} || $self->job_retries) - 1;
+ $job->{retries_left} = $retries;
+ $self->rest_retry_job($job) if $retries > 0;
+ $self->fail($job, $_) if $self->_has_fail_method;
+}
+
+1;
diff --git a/lib/presque/worker/Role/Logger.pm b/lib/presque/worker/Role/Logger.pm
index 3b6b317..4285b55 100644
--- a/lib/presque/worker/Role/Logger.pm
+++ b/lib/presque/worker/Role/Logger.pm
@@ -21,4 +21,48 @@ has logger => (
}
);
+before start => sub {
+ my $self = shift;
+
+ $self->logger->log(
+ level => 'info',
+ message => "presque worker ["
+ . $self->worker_id
+ . "] : start to listen for "
+ . $self->queue_name
+ );
+};
+
+before work => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'debug',
+ message => $self->worker_id . ' start to work',
+ );
+};
+
+before _shutdown => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' shuting down'
+ );
+};
+
+before _graceful_shutdown => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' kill child'
+ );
+};
+
+before _kill_child => sub {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
+ );
+};
+
1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
index cdfcd6d..cceea4e 100644
--- a/lib/presque/worker/Role/Management.pm
+++ b/lib/presque/worker/Role/Management.pm
@@ -7,47 +7,31 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
before start => sub {
my $self = shift;
$self->rest_register_worker;
+ $SIG{INT} = sub { $self->_shutdown };
+ $SIG{TERM} = sub { $self->_shutdown };
+ $SIG{QUIT} = sub { $self->_graceful_shutdown };
+ $SIG{USR1} = sub { $self->_kill_child };
+ $SIG{CHLD} = 'IGNORE';
};
-after start => sub {
- my $self = shift;
- $self->rest_unregister_worker;
-};
-
-before start => sub {
- my $self = shift;
- $SIG{'INT'} = sub { $self->_shutdown };
- $SIG{'TERM'} = sub { $self->_shutdown };
- $SIG{'QUIT'} = sub { $self->_graceful_shutdown };
- $SIG{'USR1'} = sub { $self->_kill_child };
-};
+after start => sub { (shift)->rest_unregister_worker; };
+after _graceful_shutdown => sub { (shift)->rest_unregister_worker; };
+after _shutdown => sub { (shift)->rest_unregister_worker; };
sub _shutdown {
my $self = shift;
- $self->logger->log(
- level => 'info',
- message => 'worker ' . $self->worker_id . ' shuting down'
- );
$self->shut_down(1);
$self->_kill_child();
}
sub _graceful_shutdown {
my $self = shift;
- $self->logger->log(
- level => 'info',
- message => 'worker ' . $self->worker_id . ' kill child'
- );
$self->shut_down(1);
$self->_kill_child();
}
sub _kill_child {
my $self = shift;
- $self->logger->log(
- level => 'info',
- message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
- );
}
1;
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
index dd84fda..6961806 100644
--- a/lib/presque/worker/Role/RESTClient.pm
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -69,7 +69,6 @@ sub rest_retry_job {
$request->content(JSON::encode_json($job));
my $res = $self->ua->request($request);
if (!$res->is_success) {
- use YAML::Syck; warn Dump $res;
$self->logger->log(
level => 'error',
message => 'failed to update job ('