summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/ControlHandler.pm16
-rw-r--r--lib/presque/RestQueueHandler.pm75
-rw-r--r--lib/presque/Role/Error.pm10
3 files changed, 54 insertions, 47 deletions
diff --git a/lib/presque/ControlHandler.pm b/lib/presque/ControlHandler.pm
index 4014e2c..b64851d 100644
--- a/lib/presque/ControlHandler.pm
+++ b/lib/presque/ControlHandler.pm
@@ -12,9 +12,8 @@ sub get {
return $self->http_error_queue if !$queue_name;
- my $key = $self->_queue_stat($queue_name);
$self->application->redis->get(
- $key,
+ $self->_queue_stat($queue_name),
sub {
my $status = shift;
$self->finish(
@@ -33,7 +32,7 @@ sub post {
return $self->http_error_queue if !$queue_name;
- my $content = JSON::decode_json( $self->request->input );
+ my $content = JSON::decode_json( $self->request->content );
if ( $content->{status} eq 'start' ) {
$self->_set_status( $queue_name, 1 );
}
@@ -41,12 +40,7 @@ sub post {
$self->_set_status( $queue_name, 0 );
}
else {
- $self->response->code(400);
- $self->finish(
- JSON::encode_json(
- { error => 'invalid status ' . $content->{status} }
- )
- );
+ $self->http_error('invalid status '.$content->{status});
}
}
@@ -61,8 +55,8 @@ sub _set_status {
my $res = shift;
$self->finish(
JSON::encode_json( {
- queue => $queue_name,
- status => $res
+ queue => $queue_name,
+ response => $res
}
)
);
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 115b3e5..b34a377 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -11,49 +11,60 @@ __PACKAGE__->asynchronous(1);
sub get {
my ( $self, $queue_name ) = @_;
- return $self->http_error_queue if ( !$queue_name );
+ return $self->http_error_queue if !$queue_name;
my $dkey = $self->_queue_delayed($queue_name);
my $lkey = $self->_queue($queue_name);
- $self->application->redis->zrangebyscore(
- $dkey, 0, time,
+ $self->application->redis->get(
+ $self->_queue_stat($queue_name),
sub {
- my $value = shift;
- if ( $value && scalar @$value ) {
- my $k = shift @$value;
- $self->application->redis->zrem(
- $dkey, $k,
- sub {
- $self->application->redis->get(
- $k,
+ my $status = shift;
+
+ if ( defined $status && $status == 0 ) {
+ return $self->http_error_closed_queue();
+ }
+
+ $self->application->redis->zrangebyscore(
+ $dkey, 0, time,
+ sub {
+ my $value = shift;
+ if ( $value && scalar @$value ) {
+ my $k = shift @$value;
+ $self->application->redis->zrem(
+ $dkey, $k,
sub {
- $self->finish(shift);
+ $self->application->redis->get(
+ $k,
+ sub {
+ $self->finish(shift);
+ }
+ );
}
);
}
- );
- }
- else {
- $self->application->redis->lpop(
- $lkey,
- sub {
- my $value = shift;
- my $qpkey = $self->_queue_policy($queue_name);
- if ($value) {
- $self->application->redis->get(
- $value,
- sub {
- $self->finish(shift);
+ else {
+ $self->application->redis->lpop(
+ $lkey,
+ sub {
+ my $value = shift;
+ my $qpkey = $self->_queue_policy($queue_name);
+ if ($value) {
+ $self->application->redis->get(
+ $value,
+ sub {
+ $self->finish(shift);
+ }
+ );
+ }
+ else {
+ $self->http_error( 'no job', 404 );
}
- );
- }
- else {
- $self->http_error('no job', 404);
- }
+ }
+ );
}
- );
- }
+ }
+ );
}
);
}
diff --git a/lib/presque/Role/Error.pm b/lib/presque/Role/Error.pm
index 8f95776..3c07e0a 100644
--- a/lib/presque/Role/Error.pm
+++ b/lib/presque/Role/Error.pm
@@ -9,13 +9,15 @@ sub http_error {
}
sub http_error_queue {
- my $self = shift;
- $self->http_error( 'queue name is missing', 404 );
+ (shift)->http_error( 'queue name is missing', 404 );
}
sub http_error_content_type {
- my $self = shift;
- $self->http_error('content-type must be set to application/json');
+ (shift)->http_error('content-type must be set to application/json');
+}
+
+sub http_error_closed_queue {
+ (shift)->http_error('queue is closed', 404);
}
1;