summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--eg/simple.pl24
-rw-r--r--lib/presque/worker.pm78
-rw-r--r--lib/presque/worker/Role/Fork.pm3
-rw-r--r--lib/presque/worker/Role/Management.pm39
-rw-r--r--lib/presque/worker/Role/RESTClient.pm12
5 files changed, 113 insertions, 43 deletions
diff --git a/eg/simple.pl b/eg/simple.pl
new file mode 100644
index 0000000..ab07c51
--- /dev/null
+++ b/eg/simple.pl
@@ -0,0 +1,24 @@
+#!/usr/bin/env perl
+use strict;
+use warnings;
+
+package myworker;
+use Moose;
+with 'presque::worker';
+
+use YAML::Syck;
+sub work {
+ my ($self, $job) = @_;
+ warn ">>>je suis $$\n";
+ warn Dump $job;
+ sleep(100);
+}
+
+package main;
+my $w = myworker->new(
+ base_uri => 'http://localhost:5000',
+ queue_name => 'foo',
+ fork_dispatcher => 1,
+);
+
+$w->start;
diff --git a/lib/presque/worker.pm b/lib/presque/worker.pm
index 77115c1..ad8ebf0 100644
--- a/lib/presque/worker.pm
+++ b/lib/presque/worker.pm
@@ -5,9 +5,10 @@ our $VERSION = '0.01';
use Carp;
use JSON;
use Try::Tiny;
-use presque::worker::Queue;
-use Moose;
+use Moose::Role;
+requires 'work';
+
with qw/
presque::worker::Role::Management
presque::worker::Role::Fork
@@ -16,7 +17,7 @@ with qw/
has queue_name => (is => 'ro', isa => 'Str', required => 1);
has retries => (is => 'rw', isa => 'Int', default => 5);
-has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
+has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
has _fail_method => (
is => 'rw',
isa => 'Bool',
@@ -24,13 +25,6 @@ has _fail_method => (
default => 0,
predicate => '_has_fail_method'
);
-has queue => (
- is => 'ro',
- isa => 'Object',
- lazy => 1,
- default =>
- sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
-);
has worker_id => (
is => 'ro',
isa => 'Str',
@@ -44,16 +38,10 @@ has worker_id => (
before start => sub {
my $self = shift;
- if (!$self->meta->find_method_by_name('work')) {
- Carp::confess "method 'work' is missing";
- }
+
if ($self->meta->find_method_by_name('fail')) {
$self->fail_method(1);
}
-};
-
-sub start {
- my $self = shift;
$self->logger->log(
level => 'info',
@@ -62,32 +50,52 @@ sub start {
. "] : start to listen for "
. $self->queue_name
);
+};
- while (!$self->shut_down) {
- my $job = $self->rest_fetch_job();
- $self->work_once($job) if $job;
- sleep($self->interval);
- }
- return $self;
-}
-
-sub work_once {
- my ($self, $job) = @_;
+around work => sub {
+ my ($orig, $self, $job) = @_;
+ $self->logger->log(
+ level => 'debug',
+ message => $self->worker_id . " start to work"
+ );
try {
- $self->work($job);
- }
- catch {
+ if ($self->fork_dispatcher) {
+ my $fork = fork();
+ if ($fork == 0) {
+ $self->$orig($job);
+ }elsif($fork > 0){
+ return;
+ }else{
+ }
+ }
+ }catch{
my $err = $_;
$self->logger->log(
level => 'error',
message => 'Job failed: ' . $err,
);
- push @{$job->{fail}}, $err;
- my $retries = ($job->{retries_left} || $self->retries) - 1;
- $self->rest_retry_job($job) if $retries > 0;
- $self->fail($job, $_) if $self->_has_fail_method;
+ $self->_job_failure($job, $err);
};
+};
+
+sub start {
+ my $self = shift;
+
+ while (!$self->shut_down) {
+ my $job = $self->rest_fetch_job();
+ $self->work($job) if $job;
+ sleep($self->interval);
+ }
+}
+
+sub _job_failure {
+ my ($self, $job, $err) = @_;
+ push @{$job->{fail}}, $err;
+ my $retries = ($job->{retries_left} || $self->retries) - 1;
+ $job->{retries_left} = $retries;
+ $self->rest_retry_job($job) if $retries > 0;
+ $self->fail($job, $_) if $self->_has_fail_method;
}
1;
@@ -101,7 +109,7 @@ presque::worker - a presque worker
package myworker;
use Moose;
- extends 'presque::worker';
+ with 'presque::worker';
sub work {
my ($self, $job) = @_;
diff --git a/lib/presque/worker/Role/Fork.pm b/lib/presque/worker/Role/Fork.pm
index 47efc31..c62ff1b 100644
--- a/lib/presque/worker/Role/Fork.pm
+++ b/lib/presque/worker/Role/Fork.pm
@@ -5,8 +5,7 @@ use Moose::Role;
has fork_dispatcher => (
is => 'ro',
isa => 'Bool',
- default => 1,
- predicate => 'has_fork_dispatcher'
+ default => 0,
);
1;
diff --git a/lib/presque/worker/Role/Management.pm b/lib/presque/worker/Role/Management.pm
index 2bd4db3..cdfcd6d 100644
--- a/lib/presque/worker/Role/Management.pm
+++ b/lib/presque/worker/Role/Management.pm
@@ -6,7 +6,7 @@ has shut_down => (is => 'rw', isa => 'Bool', default => 0,);
before start => sub {
my $self = shift;
- $self->rest_register_worker
+ $self->rest_register_worker;
};
after start => sub {
@@ -14,6 +14,41 @@ after start => sub {
$self->rest_unregister_worker;
};
-# XXX reg signal
+before start => sub {
+ my $self = shift;
+ $SIG{'INT'} = sub { $self->_shutdown };
+ $SIG{'TERM'} = sub { $self->_shutdown };
+ $SIG{'QUIT'} = sub { $self->_graceful_shutdown };
+ $SIG{'USR1'} = sub { $self->_kill_child };
+};
+
+sub _shutdown {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' shuting down'
+ );
+ $self->shut_down(1);
+ $self->_kill_child();
+}
+
+sub _graceful_shutdown {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' kill child'
+ );
+ $self->shut_down(1);
+ $self->_kill_child();
+}
+
+sub _kill_child {
+ my $self = shift;
+ $self->logger->log(
+ level => 'info',
+ message => 'worker ' . $self->worker_id . ' shuting down gracefuly'
+ );
+}
1;
+
diff --git a/lib/presque/worker/Role/RESTClient.pm b/lib/presque/worker/Role/RESTClient.pm
index 0015b98..dd84fda 100644
--- a/lib/presque/worker/Role/RESTClient.pm
+++ b/lib/presque/worker/Role/RESTClient.pm
@@ -39,13 +39,14 @@ sub rest_register_worker {
sub rest_unregister_worker {
my $self = shift;
- my $request = HTTP::Request->new(DELETE => $self->_worker_uri);
- $request->query_path(worker_id => $self->worker_id);
+ my $uri = $self->_worker_uri;
+ $uri->query_form(worker_id => $self->worker_id);
+ my $request = HTTP::Request->new(DELETE => $uri);
my $res = $self->ua->request($request);
}
sub rest_fetch_job {
- my ($self,) = @_;
+ my $self = shift;
my $res = $self->ua->request(HTTP::Request->new(GET => $self->_job_uri));
if ($res->is_success) {
@@ -57,20 +58,23 @@ sub rest_fetch_job {
message => $res->code . ':' . $res->message
);
}
+ return;
}
sub rest_retry_job {
my ($self, $job) = @_;
my $request = HTTP::Request->new(PUT => $self->_job_uri);
+ $request->header('Content-Type' => 'application/json');
$request->content(JSON::encode_json($job));
my $res = $self->ua->request($request);
if (!$res->is_success) {
+ use YAML::Syck; warn Dump $res;
$self->logger->log(
level => 'error',
message => 'failed to update job ('
. $res->code . ':'
- . $res->reason . ')',
+ . $res->message . ')',
);
}
}