diff options
| author | franck cuny <franck@lumberjaph.net> | 2010-05-13 14:52:58 +0200 |
|---|---|---|
| committer | franck cuny <franck@lumberjaph.net> | 2010-05-13 14:52:58 +0200 |
| commit | e3b18ae100e34e2c14b75d4ff40ae1d8900b67a5 (patch) | |
| tree | 3909a695ba5c043cc6f699c90eabd06a94885462 | |
| parent | _queue_worker (diff) | |
| download | presque-e3b18ae100e34e2c14b75d4ff40ae1d8900b67a5.tar.gz | |
cleanup, register what a worker is doing
| -rw-r--r-- | lib/presque/RestQueueHandler.pm | 55 |
1 files changed, 33 insertions, 22 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index b34a377..3672d6f 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -9,19 +9,22 @@ with __PACKAGE__->asynchronous(1); sub get { - my ( $self, $queue_name ) = @_; + my ($self, $queue_name) = @_; return $self->http_error_queue if !$queue_name; my $dkey = $self->_queue_delayed($queue_name); my $lkey = $self->_queue($queue_name); + my $input = $self->request->parameters; + my $worker_name = $input->{worker_name} if $input; + $self->application->redis->get( $self->_queue_stat($queue_name), sub { my $status = shift; - if ( defined $status && $status == 0 ) { + if (defined $status && $status == 0) { return $self->http_error_closed_queue(); } @@ -29,17 +32,21 @@ sub get { $dkey, 0, time, sub { my $value = shift; - if ( $value && scalar @$value ) { + if ($value && scalar @$value) { my $k = shift @$value; - $self->application->redis->zrem( - $dkey, $k, + $self->application->redis->zrem($dkey, $k); + $self->application->redis->get( + $k, sub { - $self->application->redis->get( - $k, - sub { - $self->finish(shift); - } - ); + $self->application->redis->set( + $self->_queue_worker($worker_name), + JSON::encode_json( + { queue => $queue_name, + run_at => time() + } + ) + ) if $worker_name; + $self->finish(shift); } ); } @@ -53,12 +60,21 @@ sub get { $self->application->redis->get( $value, sub { + $self->application->redis->set( + $self->_queue_worker( + $worker_name), + JSON::encode_json( + { queue => $queue_name, + run_at => time() + } + ) + ) if $worker_name; $self->finish(shift); } ); } else { - $self->http_error( 'no job', 404 ); + $self->http_error('no job', 404); } } ); @@ -95,16 +111,11 @@ sub post { my $status_set = shift; my $lkey = $self->_queue($queue_name); if ( $uuid == 1 ) { - $self->application->redis->sadd( - 'QUEUESET', - $lkey, - sub { - my $ckey = $self->_queue_stat($queue_name); - $self->application->redis->set( $ckey, 1 ); - $self->_finish_post( $lkey, $key, $status_set, + $self->application->redis->sadd('QUEUESET', $lkey); + my $ckey = $self->_queue_stat($queue_name); + $self->application->redis->set($ckey, 1); + $self->_finish_post( $lkey, $key, $status_set, $delayed, $queue_name ); - } - ); } else { $self->_finish_post( $lkey, $key, $status_set, @@ -150,7 +161,7 @@ sub _finish_post { if ($delayed) { $method = 'zadd'; - @args = ($queue_name.':delayed', $delayed, $key); + @args = ($queue_name . ':delayed', $delayed, $key); } $self->application->redis->$method( |
