summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-15 10:15:02 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-15 10:15:02 +0200
commit42dabadb6e972a3dbc0d1088f55ac3d14276d214 (patch)
treee4c07f8bd13d1d7e1bc4cbb0a63c0910d2d8381f
parentuse new role (diff)
downloadpresque-42dabadb6e972a3dbc0d1088f55ac3d14276d214.tar.gz
some around methods, code clean up
Diffstat (limited to '')
-rw-r--r--lib/presque/RestQueueHandler.pm117
1 files changed, 76 insertions, 41 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index facb7e5..6378726 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -8,16 +8,40 @@ with
__PACKAGE__->asynchronous(1);
+around [qw/put post/] => sub {
+ my $orig = shift;
+ my $self = shift;
+ my $queue_name = shift;
+
+ return $self->http_error_queue if (!$queue_name);
+
+ return $self->http_error_content_type
+ if (!$self->request->header('Content-Type')
+ || $self->request->header('Content-Type') ne 'application/json');
+
+ return $self->http_error("job is missing") if !$self->request->content;
+
+ $self->$orig($queue_name);
+};
+
+around [qw/get delete/] => sub {
+ my $orig = shift;
+ my $self = shift;
+ my $queue_name = shift;
+
+ return $self->http_error_queue if (!$queue_name);
+
+ $self->$orig($queue_name);
+};
+
sub get {
my ($self, $queue_name) = @_;
- return $self->http_error_queue if !$queue_name;
-
my $dkey = $self->_queue_delayed($queue_name);
my $lkey = $self->_queue($queue_name);
my $input = $self->request->parameters;
- my $worker_name = $input->{worker_name} if $input;
+ my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
$self->application->redis->get(
$self->_queue_stat($queue_name),
@@ -40,7 +64,7 @@ sub get {
sub {
my $job = shift;
$self->_finish_get($job, $queue_name,
- $worker_name);
+ $worker_id);
}
);
}
@@ -55,7 +79,7 @@ sub get {
sub {
my $job = shift;
$self->_finish_get($job,
- $queue_name, $worker_name);
+ $queue_name, $worker_id);
}
);
}
@@ -73,47 +97,26 @@ sub get {
sub post {
my ($self, $queue_name) = @_;
+ $self->_create_job($queue_name);
+}
- return $self->http_error_queue if (!$queue_name);
-
- return $self->http_error_content_type
- if (!$self->request->header('Content-Type')
- || $self->request->header('Content-Type') ne 'application/json');
-
- my $input = $self->request->parameters;
- my $delayed = $input->{delayed};
+sub put {
+ my ($self, $queue_name) = @_;
- my $p = $self->request->content;
+ my $input = $self->request->parameters;
+ my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
- $self->application->redis->incr(
- $self->_queue_uuid($queue_name),
- sub {
- my $uuid = shift;
- my $key = $self->_queue_key($queue_name, $uuid);
+ $self->application->redis->incr('failed');
+ if ($worker_id) {
+ $self->application->redis->incr('failed:' . $worker_id);
+ }
- $self->application->redis->set(
- $key, $p,
- sub {
- my $status_set = shift;
- my $lkey = $self->_queue($queue_name);
- if ($uuid == 1) {
- $self->application->redis->sadd('QUEUESET', $lkey);
- my $ckey = $self->_queue_stat($queue_name);
- $self->application->redis->set($ckey, 1);
- }
- $self->_finish_post($lkey, $key, $status_set, $delayed,
- $queue_name);
- }
- );
- }
- );
+ $self->_create_job($queue_name);
}
sub delete {
my ($self, $queue_name) = @_;
- return $self->http_error_queue if (!$queue_name);
-
# delete delayed queue
my $lkey = $self->_queue($queue_name);
my $dkey = $self->_queue_delayed($queue_name);
@@ -125,23 +128,55 @@ sub delete {
}
sub _finish_get {
- my ($self, $job, $queue_name, $worker_name) = @_;
+ my ($self, $job, $queue_name, $worker_id) = @_;
$self->application->redis->incr('processed');
- if ($worker_name) {
+ if ($worker_id) {
$self->application->redis->set(
- $self->_queue_worker($worker_name),
+ $self->_queue_worker($worker_id),
JSON::encode_json(
{ queue => $queue_name,
run_at => time()
}
)
);
- $self->application->redis->incr('processed:' . $worker_name);
+ $self->application->redis->incr('processed:' . $worker_id);
}
$self->finish($job);
}
+sub _create_job {
+ my ($self, $queue_name) = @_;
+
+ my $p = $self->request->content;
+
+ my $input = $self->request->parameters;
+ my $delayed = $input->{delayed} if $input && $input->{delayed};
+
+ $self->application->redis->incr(
+ $self->_queue_uuid($queue_name),
+ sub {
+ my $uuid = shift;
+ my $key = $self->_queue_key($queue_name, $uuid);
+
+ $self->application->redis->set(
+ $key, $p,
+ sub {
+ my $status_set = shift;
+ my $lkey = $self->_queue($queue_name);
+ if ($uuid == 1) {
+ $self->application->redis->sadd('QUEUESET', $lkey);
+ my $ckey = $self->_queue_stat($queue_name);
+ $self->application->redis->set($ckey, 1);
+ }
+ $self->_finish_post($lkey, $key, $status_set, $delayed,
+ $queue_name);
+ }
+ );
+ }
+ );
+}
+
sub _finish_post {
my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;