summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-24 11:12:32 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-24 11:12:32 +0200
commit5f16fab631cebc35bcece4a1394d4edfa4af025a (patch)
tree880776bcdcbdb8f1235907fd6e8b4990337607cd /lib
parentadd POD (diff)
downloadpresque-5f16fab631cebc35bcece4a1394d4edfa4af025a.tar.gz
extends restqueue handler and overwrite only needed methods
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/RestQueueBatchHandler.pm148
1 files changed, 148 insertions, 0 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
new file mode 100644
index 0000000..114c368
--- /dev/null
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -0,0 +1,148 @@
+package presque::RestQueueBatchHandler;
+
+use JSON;
+use Moose;
+extends 'presque::RestQueueHandler';
+
+__PACKAGE__->asynchronous(1);
+
+sub put { (shift)->http_error('PUT is not supported in batch mode'); }
+sub delete { (shift)->htttp_error('DELETE is not supported in batch mode'); }
+
+sub _fetch_job {
+ my ($self, $queue_name) = @_;
+
+ my $dkey = $self->_queue_delayed($queue_name);
+ my $lkey = $self->_queue($queue_name);
+
+ my $input = $self->request->parameters;
+ my $worker_id = $input->{worker_id} if $input && $input->{workerd_id};
+ my $batch_size =
+ ($input && $input->{batch_size}) ? $input->{batch_size} : 50;
+
+ $self->application->redis->zrangebyscore(
+ $dkey, 0, time,
+ sub {
+ my $values = shift;
+ if ($values && scalar @$values) {
+ $self->_get_jobs_from_delay_queue($dkey, $queue_name, $batch_size, $values, $worker_id);
+ }
+ else {
+ $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, 0, $batch_size, []);
+ }
+ }
+ );
+}
+
+sub _get_jobs_from_delay_queue {
+ my ($self, $dkey, $queue_name, $batch_size, $values, $worker_id) = @_;
+
+ my @keys = @$values[0 .. ($batch_size - 1)];
+ foreach (@keys) {
+ $self->application->redis->zrem($dkey, $_);
+ }
+ $self->application->redis->mget(
+ @keys,
+ sub {
+ my $jobs = shift;
+ $self->_finish_get($jobs, $queue_name, $worker_id);
+ }
+ );
+}
+
+sub _get_jobs_from_queue {
+ my ($self, $lkey, $queue_name, $worker_id, $pos, $batch_size, $jobs) = @_;
+
+ $self->application->redis->lpop(
+ $lkey,
+ sub {
+ my $value = shift;
+ if ($value) {
+ $self->application->redis->get(
+ $value,
+ sub {
+ my $job = shift;
+ push @$jobs, $job;
+ if (++$pos >= ($batch_size - 1)) {
+ $self->_finish_get($jobs, $queue_name, $worker_id);
+ }
+ else {
+ $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, $pos,
+ $batch_size, $jobs);
+ }
+ }
+ );
+ }elsif(scalar @$jobs) {
+ $self->_finish_get($jobs, $queue_name, $worker_id);
+ }
+ else {
+ $self->http_error('no job', 404);
+ }
+ }
+ );
+}
+
+sub _update_queue_stats {
+ my ($self, $queue_name, $jobs) = @_;
+
+ $self->application->redis->incrby('processed', scalar @$jobs);
+ $self->application->redis->incrby($self->_queue_processed($queue_name), scalar @$jobs);
+}
+
+sub _update_worker_stats {
+ my ($self, $queue_name, $worker_id, $jobs) = @_;
+
+ if ($worker_id) {
+ $self->application->redis->set(
+ $self->_queue_worker($worker_id),
+ JSON::encode_json(
+ { queue => $queue_name,
+ run_at => time()
+ }
+ )
+ );
+ $self->application->redis->incrby('processed:' . $worker_id, scalar @$jobs);
+ }
+}
+
+sub _create_job {
+ my ($self, $queue_name) = @_;
+
+ my $content = JSON::decode_json($self->request->content);
+ my $jobs = $content->{jobs};
+
+ if (ref $jobs ne 'ARRAY') {
+ $self->http_error('jobs should be an array of job');
+ return;
+ }
+
+ my $input = $self->request->parameters;
+ my $delayed = $input->{delayed} if $input && $input->{delayed};
+
+ foreach my $job (@$jobs) {
+ $job = JSON::encode_json($job);
+
+ $self->application->redis->incr(
+ $self->_queue_uuid($queue_name),
+ sub {
+ my $uuid = shift;
+ my $key = $self->_queue_key($queue_name, $uuid);
+ $self->application->redis->set(
+ $key, $job,
+ sub {
+ my $status_set = shift;
+ my $lkey = $self->_queue($queue_name);
+
+ $self->new_queue($queue_name, $lkey) if ($uuid == 1);
+ $self->push_job($queue_name, $lkey, $key, $delayed);
+ }
+ );
+ }
+ );
+ }
+
+ $self->response->code(201);
+ $self->finish();
+}
+
+1;