summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/RestQueueHandler.pm247
1 files changed, 136 insertions, 111 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index bffe446..d708760 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -1,138 +1,120 @@
package presque::RestQueueHandler;
+use 5.010;
+
use JSON;
+use Digest::SHA;
+
use Moose;
extends 'Tatsumaki::Handler';
with
- qw/presque::Role::QueueName presque::Role::Error presque::Role::Response/;
+ 'presque::Role::Queue::Names',
+ 'presque::Role::Error', 'presque::Role::Response', 'presque::Role::Queue',
+ 'presque::Role::Queue::WithContent' => {methods => [qw/put post/]},
+ 'presque::Role::Queue::WithQueueName' => {methods => [qw/get delete/]};
__PACKAGE__->asynchronous(1);
-around [qw/put post/] => sub {
- my $orig = shift;
- my $self = shift;
- my $queue_name = shift;
-
- return $self->http_error_queue if (!$queue_name);
-
- return $self->http_error_content_type
- if (!$self->request->header('Content-Type')
- || $self->request->header('Content-Type') ne 'application/json');
-
- return $self->http_error("job is missing") if !$self->request->content;
-
- $self->$orig($queue_name);
-};
-
-around [qw/get delete/] => sub {
- my $orig = shift;
- my $self = shift;
- my $queue_name = shift;
+sub get { (shift)->_is_queue_opened(shift) }
+sub post { (shift)->_create_job(shift) }
+sub put { (shift)->_failed_job(shift) }
+sub delete { (shift)->_purge_queue(shift) }
- return $self->http_error_queue if (!$queue_name);
-
- $self->$orig($queue_name);
-};
-
-sub get {
+sub _is_queue_opened {
my ($self, $queue_name) = @_;
- my $dkey = $self->_queue_delayed($queue_name);
- my $lkey = $self->_queue($queue_name);
-
- my $input = $self->request->parameters;
- my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
-
$self->application->redis->get(
$self->_queue_stat($queue_name),
sub {
my $status = shift;
-
if (defined $status && $status == 0) {
- return $self->http_error_closed_queue();
+ return $self->http_error_queue_is_closed();
+ }else{
+ return $self->_fetch_job($queue_name);
}
-
- $self->application->redis->zrangebyscore(
- $dkey, 0, time,
- sub {
- my $value = shift;
- if ($value && scalar @$value) {
- my $k = shift @$value;
- $self->application->redis->zrem($dkey, $k);
- $self->application->redis->get(
- $k,
- sub {
- my $job = shift;
- $self->_finish_get($job, $queue_name,
- $worker_id);
- }
- );
- }
- else {
- $self->application->redis->lpop(
- $lkey,
- sub {
- my $value = shift;
- if ($value) {
- $self->application->redis->get(
- $value,
- sub {
- my $job = shift;
- $self->_finish_get($job,
- $queue_name, $worker_id);
- }
- );
- }
- else {
- $self->http_error('no job', 404);
- }
- }
- );
- }
- }
- );
}
);
}
-sub post {
+sub _fetch_job {
my ($self, $queue_name) = @_;
- $self->_create_job($queue_name);
-}
-sub put {
- my ($self, $queue_name) = @_;
+ my $dkey = $self->_queue_delayed($queue_name);
+ my $lkey = $self->_queue($queue_name);
my $input = $self->request->parameters;
my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
- $self->application->redis->incr('failed');
- $self->application->redis->incr($self->_queue_failed($queue_name));
- if ($worker_id) {
- $self->application->redis->incr('failed:' . $worker_id);
- }
-
- $self->_create_job($queue_name);
+ $self->application->redis->zrangebyscore(
+ $dkey, 0, time,
+ sub {
+ my $value = shift;
+ if ($value && scalar @$value) {
+ $self->_get_job_from_delay_queue($dkey, $queue_name, $value,
+ $worker_id);
+ }
+ else {
+ $self->_get_job_from_queue($lkey, $queue_name);
+ }
+ }
+ );
}
-sub delete {
- my ($self, $queue_name) = @_;
+sub _get_job_from_delay_queue {
+ my ($self, $dkey, $queue_name, $value, $worker_id) = @_;
- # XXX delete failed && processed
- my $lkey = $self->_queue($queue_name);
- my $dkey = $self->_queue_delayed($queue_name);
+ my $k = shift @$value;
+ $self->application->redis->zrem($dkey, $k);
+ $self->application->redis->get(
+ $k,
+ sub {
+ my $job = shift;
+ $self->_finish_get($job, $queue_name, $worker_id);
+ }
+ );
+}
- $self->application->redis->del($lkey);
- $self->application->redis->del($dkey);
- $self->response->code(204);
- $self->finish();
+sub _get_job_from_queue {
+ my ($self, $lkey, $queue_name, $worker_id) = @_;
+
+ $self->application->redis->lpop(
+ $lkey,
+ sub {
+ my $value = shift;
+ if ($value) {
+ $self->application->redis->get(
+ $value,
+ sub {
+ my $job = shift;
+ $self->_finish_get($job, $queue_name, $worker_id);
+ }
+ );
+ }
+ else {
+ $self->http_error('no job', 404);
+ }
+ }
+ );
}
sub _finish_get {
my ($self, $job, $queue_name, $worker_id) = @_;
+ $self->_update_queue_stats($queue_name, $job);
+ $self->_update_worker_stats($queue_name, $worker_id, $job);
+ $self->finish($job);
+}
+
+sub _update_queue_stats {
+ my ($self, $queue_name) = @_;
+
$self->application->redis->incr('processed');
$self->application->redis->incr($self->_queue_processed($queue_name));
+}
+
+sub _update_worker_stats {
+ my ($self, $queue_name, $worker_id) = @_;
+
if ($worker_id) {
$self->application->redis->set(
$self->_queue_worker($worker_id),
@@ -144,7 +126,6 @@ sub _finish_get {
);
$self->application->redis->incr('processed:' . $worker_id);
}
- $self->finish($job);
}
sub _create_job {
@@ -154,6 +135,32 @@ sub _create_job {
my $input = $self->request->parameters;
my $delayed = $input->{delayed} if $input && $input->{delayed};
+ my $uniq = $input->{uniq} if $input && $input->{uniq};
+
+ # XXX UNIQ IS BORKED
+ $uniq = Digest::SHA->sha256_hex($p) if ($uniq && $uniq ~~ "1");
+
+ if ($uniq) {
+ $self->application->redis->sismember(
+ $self->_queue_uniq($queue_name), $uniq,
+ sub {
+ my $status = shift;
+ if ($status) {
+ $self->http_error('job already exists');
+ }
+ else {
+ $self->_insert_to_queue($queue_name, $p, $delayed, $uniq);
+ }
+ }
+ );
+ }
+ else {
+ $self->_insert_to_queue($queue_name, $p, $delayed);
+ }
+}
+
+sub _insert_to_queue {
+ my ($self, $queue_name, $p, $delayed, $uniq) = @_;
$self->application->redis->incr(
$self->_queue_uuid($queue_name),
@@ -166,30 +173,48 @@ sub _create_job {
sub {
my $status_set = shift;
my $lkey = $self->_queue($queue_name);
- if ($uuid == 1) {
- $self->application->redis->sadd('QUEUESET', $lkey);
- my $ckey = $self->_queue_stat($queue_name);
- $self->application->redis->set($ckey, 1);
- }
+ $self->new_queue($queue_name, $lkey) if ($uuid == 1);
+ $self->application->redis->zadd(
+ $self->_queue_uniq($queue_name), $uniq)
+ if $uniq;
$self->_finish_post($lkey, $key, $status_set, $delayed,
$queue_name);
- }
+ }
);
}
);
}
-sub _finish_post {
- my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;
+sub _failed_job {
+ my ($self, $queue_name) = @_;
- my ($method, @args) = ('rpush', $lkey, $key);
+ my $input = $self->request->parameters;
+ my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
- if ($delayed) {
- $method = 'zadd';
- @args = ($queue_name . ':delayed', $delayed, $key);
- }
+ $self->application->redis->incr('failed');
+ $self->application->redis->incr($self->_queue_failed($queue_name));
+ $self->application->redis->incr('failed:' . $worker_id) if $worker_id;
+
+ $self->_create_job($queue_name);
+}
+
+sub _purge_queue {
+ my ($self, $queue_name) = @_;
+
+ # XXX delete failed && processed
+ my $lkey = $self->_queue($queue_name);
+ my $dkey = $self->_queue_delayed($queue_name);
+
+ $self->application->redis->del($lkey);
+ $self->application->redis->del($dkey);
+ $self->response->code(204);
+ $self->finish();
+}
+
+sub _finish_post {
+ my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;
- $self->application->redis->$method(@args,);
+ $self->push_job($queue_name, $lkey, $key, $delayed);
$self->response->code(201);
$self->finish();
}