summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-13 14:52:58 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-13 14:52:58 +0200
commite3b18ae100e34e2c14b75d4ff40ae1d8900b67a5 (patch)
tree3909a695ba5c043cc6f699c90eabd06a94885462
parent_queue_worker (diff)
downloadpresque-e3b18ae100e34e2c14b75d4ff40ae1d8900b67a5.tar.gz
cleanup, register what a worker is doing
-rw-r--r--lib/presque/RestQueueHandler.pm55
1 files changed, 33 insertions, 22 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index b34a377..3672d6f 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -9,19 +9,22 @@ with
__PACKAGE__->asynchronous(1);
sub get {
- my ( $self, $queue_name ) = @_;
+ my ($self, $queue_name) = @_;
return $self->http_error_queue if !$queue_name;
my $dkey = $self->_queue_delayed($queue_name);
my $lkey = $self->_queue($queue_name);
+ my $input = $self->request->parameters;
+ my $worker_name = $input->{worker_name} if $input;
+
$self->application->redis->get(
$self->_queue_stat($queue_name),
sub {
my $status = shift;
- if ( defined $status && $status == 0 ) {
+ if (defined $status && $status == 0) {
return $self->http_error_closed_queue();
}
@@ -29,17 +32,21 @@ sub get {
$dkey, 0, time,
sub {
my $value = shift;
- if ( $value && scalar @$value ) {
+ if ($value && scalar @$value) {
my $k = shift @$value;
- $self->application->redis->zrem(
- $dkey, $k,
+ $self->application->redis->zrem($dkey, $k);
+ $self->application->redis->get(
+ $k,
sub {
- $self->application->redis->get(
- $k,
- sub {
- $self->finish(shift);
- }
- );
+ $self->application->redis->set(
+ $self->_queue_worker($worker_name),
+ JSON::encode_json(
+ { queue => $queue_name,
+ run_at => time()
+ }
+ )
+ ) if $worker_name;
+ $self->finish(shift);
}
);
}
@@ -53,12 +60,21 @@ sub get {
$self->application->redis->get(
$value,
sub {
+ $self->application->redis->set(
+ $self->_queue_worker(
+ $worker_name),
+ JSON::encode_json(
+ { queue => $queue_name,
+ run_at => time()
+ }
+ )
+ ) if $worker_name;
$self->finish(shift);
}
);
}
else {
- $self->http_error( 'no job', 404 );
+ $self->http_error('no job', 404);
}
}
);
@@ -95,16 +111,11 @@ sub post {
my $status_set = shift;
my $lkey = $self->_queue($queue_name);
if ( $uuid == 1 ) {
- $self->application->redis->sadd(
- 'QUEUESET',
- $lkey,
- sub {
- my $ckey = $self->_queue_stat($queue_name);
- $self->application->redis->set( $ckey, 1 );
- $self->_finish_post( $lkey, $key, $status_set,
+ $self->application->redis->sadd('QUEUESET', $lkey);
+ my $ckey = $self->_queue_stat($queue_name);
+ $self->application->redis->set($ckey, 1);
+ $self->_finish_post( $lkey, $key, $status_set,
$delayed, $queue_name );
- }
- );
}
else {
$self->_finish_post( $lkey, $key, $status_set,
@@ -150,7 +161,7 @@ sub _finish_post {
if ($delayed) {
$method = 'zadd';
- @args = ($queue_name.':delayed', $delayed, $key);
+ @args = ($queue_name . ':delayed', $delayed, $key);
}
$self->application->redis->$method(