summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/presque/RestQueueHandler.pm99
1 files changed, 74 insertions, 25 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 050f767..a7842f5 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -2,6 +2,8 @@ package presque::RestQueueHandler;
use Moose;
extends 'Tatsumaki::Handler';
+with 'presque::Role::QueueName';
+
__PACKAGE__->asynchronous(1);
use JSON;
@@ -20,24 +22,48 @@ sub get {
return;
}
- my $lkey = $queue_name . ':queue';
+ my $dkey = $self->_queue_delayed($queue_name);
+ my $lkey = $self->_queue($queue_name);
- $self->application->redis->lpop(
- $lkey,
+ $self->application->redis->zrangebyscore(
+ $dkey, 0, time,
sub {
my $value = shift;
- my $qpkey = $queue_name . ':queupolicy';
- if ($value) {
- my $val = $self->application->redis->get(
- $value,
+ if ( $value && scalar @$value ) {
+ my $k = shift @$value;
+ $self->application->redis->zrem(
+ $dkey, $k,
+ sub {
+ $self->application->redis->get(
+ $k,
+ sub {
+ $self->finish(shift);
+ }
+ );
+ }
+ );
+ }
+ else {
+ $self->application->redis->lpop(
+ $lkey,
sub {
- $self->finish(shift);
+ my $value = shift;
+ my $qpkey = $self->_queue_policy($queue_name);
+ if ($value) {
+ $self->application->redis->get(
+ $value,
+ sub {
+ $self->finish(shift);
+ }
+ );
+ }
+ else {
+ $self->response->code(404);
+ $self->finish(
+ JSON::encode_json( { error => "no job" } ) );
+ }
}
);
- }else{
- $self->response->code(404);
-
- $self->finish(JSON::encode_json({error => "no job"}));
}
}
);
@@ -61,32 +87,37 @@ sub post {
return;
}
+ my $input = $self->request->parameters;
+ my $delayed = $input->{delayed};
+
my $p = $self->request->content;
+
$self->application->redis->incr(
- $queue_name . ':UUID',
+ $self->_queue_uuid($queue_name),
sub {
my $uuid = shift;
- my $key = $queue_name . ':' . $uuid;
+ my $key = $self->_queue_key($queue_name, $uuid);
$self->application->redis->set(
$key, $p,
sub {
my $status_set = shift;
- my $lkey = $queue_name . ':queue';
+ my $lkey = $self->_queue($queue_name);
if ( $uuid == 1 ) {
$self->application->redis->sadd(
'QUEUESET',
$lkey,
sub {
- my $ckey = 'queuestat:' . $queue_name;
+ my $ckey = $self->_queue_stat($queue_name);
$self->application->redis->set( $ckey, 1 );
- $self->_finish_post( $lkey, $key,
- $status_set );
+ $self->_finish_post( $lkey, $key, $status_set,
+ $delayed, $queue_name );
}
);
}
else {
- $self->_finish_post( $lkey, $key, $status_set );
+ $self->_finish_post( $lkey, $key, $status_set,
+ $delayed, $queue_name );
}
}
);
@@ -103,22 +134,40 @@ sub delete {
return;
}
- my $lkey = $queue_name . ':queue';
+ # delete delayed queue
+ my $lkey = $self->_queue($queue_name);
+ my $dkey = $self->_queue_delayed($queue_name);
+
$self->application->redis->del(
$lkey,
sub {
my $res = shift;
- $self->finish(
- JSON::encode_json( { queue => $queue_name, status => $res } )
+ $self->application->redis->del(
+ $dkey,
+ sub {
+ $self->finish(
+ JSON::encode_json(
+ { queue => $queue_name, status => $res }
+ )
+ );
+ }
);
}
);
}
sub _finish_post {
- my ($self, $lkey, $key, $result) = @_;
- $self->application->redis->rpush(
- $lkey, $key,
+ my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_;
+
+ my ($method, @args) = ('rpush', $lkey, $key);
+
+ if ($delayed) {
+ $method = 'zadd';
+ @args = ($queue_name.':delayed', $delayed, $key);
+ }
+
+ $self->application->redis->$method(
+ @args,
sub {
$self->finish({status => 'success'});
}