diff options
| author | franck cuny <franck@lumberjaph.net> | 2010-06-24 11:12:52 +0200 |
|---|---|---|
| committer | franck cuny <franck@lumberjaph.net> | 2010-06-24 11:12:52 +0200 |
| commit | 500bad3d9a9540aa11cccf8628abbca65d6e2b66 (patch) | |
| tree | b9b8a98394d39526efe2e8bc8617ca5069d1838a /lib | |
| parent | extends restqueue handler and overwrite only needed methods (diff) | |
| download | presque-500bad3d9a9540aa11cccf8628abbca65d6e2b66.tar.gz | |
code cleanup; split in methods so we can extend this handler
Diffstat (limited to '')
| -rw-r--r-- | lib/presque/RestQueueHandler.pm | 247 |
1 files changed, 136 insertions, 111 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm index bffe446..d708760 100644 --- a/lib/presque/RestQueueHandler.pm +++ b/lib/presque/RestQueueHandler.pm @@ -1,138 +1,120 @@ package presque::RestQueueHandler; +use 5.010; + use JSON; +use Digest::SHA; + use Moose; extends 'Tatsumaki::Handler'; with - qw/presque::Role::QueueName presque::Role::Error presque::Role::Response/; + 'presque::Role::Queue::Names', + 'presque::Role::Error', 'presque::Role::Response', 'presque::Role::Queue', + 'presque::Role::Queue::WithContent' => {methods => [qw/put post/]}, + 'presque::Role::Queue::WithQueueName' => {methods => [qw/get delete/]}; __PACKAGE__->asynchronous(1); -around [qw/put post/] => sub { - my $orig = shift; - my $self = shift; - my $queue_name = shift; - - return $self->http_error_queue if (!$queue_name); - - return $self->http_error_content_type - if (!$self->request->header('Content-Type') - || $self->request->header('Content-Type') ne 'application/json'); - - return $self->http_error("job is missing") if !$self->request->content; - - $self->$orig($queue_name); -}; - -around [qw/get delete/] => sub { - my $orig = shift; - my $self = shift; - my $queue_name = shift; +sub get { (shift)->_is_queue_opened(shift) } +sub post { (shift)->_create_job(shift) } +sub put { (shift)->_failed_job(shift) } +sub delete { (shift)->_purge_queue(shift) } - return $self->http_error_queue if (!$queue_name); - - $self->$orig($queue_name); -}; - -sub get { +sub _is_queue_opened { my ($self, $queue_name) = @_; - my $dkey = $self->_queue_delayed($queue_name); - my $lkey = $self->_queue($queue_name); - - my $input = $self->request->parameters; - my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; - $self->application->redis->get( $self->_queue_stat($queue_name), sub { my $status = shift; - if (defined $status && $status == 0) { - return $self->http_error_closed_queue(); + return $self->http_error_queue_is_closed(); + }else{ + return $self->_fetch_job($queue_name); } - - $self->application->redis->zrangebyscore( - $dkey, 0, time, - sub { - my $value = shift; - if ($value && scalar @$value) { - my $k = shift @$value; - $self->application->redis->zrem($dkey, $k); - $self->application->redis->get( - $k, - sub { - my $job = shift; - $self->_finish_get($job, $queue_name, - $worker_id); - } - ); - } - else { - $self->application->redis->lpop( - $lkey, - sub { - my $value = shift; - if ($value) { - $self->application->redis->get( - $value, - sub { - my $job = shift; - $self->_finish_get($job, - $queue_name, $worker_id); - } - ); - } - else { - $self->http_error('no job', 404); - } - } - ); - } - } - ); } ); } -sub post { +sub _fetch_job { my ($self, $queue_name) = @_; - $self->_create_job($queue_name); -} -sub put { - my ($self, $queue_name) = @_; + my $dkey = $self->_queue_delayed($queue_name); + my $lkey = $self->_queue($queue_name); my $input = $self->request->parameters; my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; - $self->application->redis->incr('failed'); - $self->application->redis->incr($self->_queue_failed($queue_name)); - if ($worker_id) { - $self->application->redis->incr('failed:' . $worker_id); - } - - $self->_create_job($queue_name); + $self->application->redis->zrangebyscore( + $dkey, 0, time, + sub { + my $value = shift; + if ($value && scalar @$value) { + $self->_get_job_from_delay_queue($dkey, $queue_name, $value, + $worker_id); + } + else { + $self->_get_job_from_queue($lkey, $queue_name); + } + } + ); } -sub delete { - my ($self, $queue_name) = @_; +sub _get_job_from_delay_queue { + my ($self, $dkey, $queue_name, $value, $worker_id) = @_; - # XXX delete failed && processed - my $lkey = $self->_queue($queue_name); - my $dkey = $self->_queue_delayed($queue_name); + my $k = shift @$value; + $self->application->redis->zrem($dkey, $k); + $self->application->redis->get( + $k, + sub { + my $job = shift; + $self->_finish_get($job, $queue_name, $worker_id); + } + ); +} - $self->application->redis->del($lkey); - $self->application->redis->del($dkey); - $self->response->code(204); - $self->finish(); +sub _get_job_from_queue { + my ($self, $lkey, $queue_name, $worker_id) = @_; + + $self->application->redis->lpop( + $lkey, + sub { + my $value = shift; + if ($value) { + $self->application->redis->get( + $value, + sub { + my $job = shift; + $self->_finish_get($job, $queue_name, $worker_id); + } + ); + } + else { + $self->http_error('no job', 404); + } + } + ); } sub _finish_get { my ($self, $job, $queue_name, $worker_id) = @_; + $self->_update_queue_stats($queue_name, $job); + $self->_update_worker_stats($queue_name, $worker_id, $job); + $self->finish($job); +} + +sub _update_queue_stats { + my ($self, $queue_name) = @_; + $self->application->redis->incr('processed'); $self->application->redis->incr($self->_queue_processed($queue_name)); +} + +sub _update_worker_stats { + my ($self, $queue_name, $worker_id) = @_; + if ($worker_id) { $self->application->redis->set( $self->_queue_worker($worker_id), @@ -144,7 +126,6 @@ sub _finish_get { ); $self->application->redis->incr('processed:' . $worker_id); } - $self->finish($job); } sub _create_job { @@ -154,6 +135,32 @@ sub _create_job { my $input = $self->request->parameters; my $delayed = $input->{delayed} if $input && $input->{delayed}; + my $uniq = $input->{uniq} if $input && $input->{uniq}; + + # XXX UNIQ IS BORKED + $uniq = Digest::SHA->sha256_hex($p) if ($uniq && $uniq ~~ "1"); + + if ($uniq) { + $self->application->redis->sismember( + $self->_queue_uniq($queue_name), $uniq, + sub { + my $status = shift; + if ($status) { + $self->http_error('job already exists'); + } + else { + $self->_insert_to_queue($queue_name, $p, $delayed, $uniq); + } + } + ); + } + else { + $self->_insert_to_queue($queue_name, $p, $delayed); + } +} + +sub _insert_to_queue { + my ($self, $queue_name, $p, $delayed, $uniq) = @_; $self->application->redis->incr( $self->_queue_uuid($queue_name), @@ -166,30 +173,48 @@ sub _create_job { sub { my $status_set = shift; my $lkey = $self->_queue($queue_name); - if ($uuid == 1) { - $self->application->redis->sadd('QUEUESET', $lkey); - my $ckey = $self->_queue_stat($queue_name); - $self->application->redis->set($ckey, 1); - } + $self->new_queue($queue_name, $lkey) if ($uuid == 1); + $self->application->redis->zadd( + $self->_queue_uniq($queue_name), $uniq) + if $uniq; $self->_finish_post($lkey, $key, $status_set, $delayed, $queue_name); - } + } ); } ); } -sub _finish_post { - my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_; +sub _failed_job { + my ($self, $queue_name) = @_; - my ($method, @args) = ('rpush', $lkey, $key); + my $input = $self->request->parameters; + my $worker_id = $input->{worker_id} if $input && $input->{worker_id}; - if ($delayed) { - $method = 'zadd'; - @args = ($queue_name . ':delayed', $delayed, $key); - } + $self->application->redis->incr('failed'); + $self->application->redis->incr($self->_queue_failed($queue_name)); + $self->application->redis->incr('failed:' . $worker_id) if $worker_id; + + $self->_create_job($queue_name); +} + +sub _purge_queue { + my ($self, $queue_name) = @_; + + # XXX delete failed && processed + my $lkey = $self->_queue($queue_name); + my $dkey = $self->_queue_delayed($queue_name); + + $self->application->redis->del($lkey); + $self->application->redis->del($dkey); + $self->response->code(204); + $self->finish(); +} + +sub _finish_post { + my ($self, $lkey, $key, $result, $delayed, $queue_name) = @_; - $self->application->redis->$method(@args,); + $self->push_job($queue_name, $lkey, $key, $delayed); $self->response->code(201); $self->finish(); } |
