summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-06-24 16:11:27 +0200
committerfranck cuny <franck@lumberjaph.net>2010-06-24 16:11:27 +0200
commit0f2b238b98f298da4acaa49ec9e8c03933e4a43e (patch)
treeab2a9adec280f2d0edf9579f3884a33a8e994681
parentsmall script to insert tweet from the stream api to presque (diff)
downloadpresque-0f2b238b98f298da4acaa49ec9e8c03933e4a43e.tar.gz
alter order for arguments in methods call
-rw-r--r--lib/presque/RestQueueBatchHandler.pm26
-rw-r--r--lib/presque/RestQueueHandler.pm103
2 files changed, 94 insertions, 35 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 114c368..adbfe06 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -13,10 +13,8 @@ sub _fetch_job {
my ($self, $queue_name) = @_;
my $dkey = $self->_queue_delayed($queue_name);
- my $lkey = $self->_queue($queue_name);
my $input = $self->request->parameters;
- my $worker_id = $input->{worker_id} if $input && $input->{workerd_id};
my $batch_size =
($input && $input->{batch_size}) ? $input->{batch_size} : 50;
@@ -25,17 +23,17 @@ sub _fetch_job {
sub {
my $values = shift;
if ($values && scalar @$values) {
- $self->_get_jobs_from_delay_queue($dkey, $queue_name, $batch_size, $values, $worker_id);
+ $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
}
else {
- $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, 0, $batch_size, []);
+ $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []);
}
}
);
}
sub _get_jobs_from_delay_queue {
- my ($self, $dkey, $queue_name, $batch_size, $values, $worker_id) = @_;
+ my ($self, $queue_name, $dkey, $values, $batch_size) = @_;
my @keys = @$values[0 .. ($batch_size - 1)];
foreach (@keys) {
@@ -45,13 +43,15 @@ sub _get_jobs_from_delay_queue {
@keys,
sub {
my $jobs = shift;
- $self->_finish_get($jobs, $queue_name, $worker_id);
+ $self->_finish_get($queue_name, $jobs);
}
);
}
sub _get_jobs_from_queue {
- my ($self, $lkey, $queue_name, $worker_id, $pos, $batch_size, $jobs) = @_;
+ my ($self, $queue_name, $pos, $batch_size, $jobs) = @_;
+
+ my $lkey = $self->_queue($queue_name);
$self->application->redis->lpop(
$lkey,
@@ -64,16 +64,15 @@ sub _get_jobs_from_queue {
my $job = shift;
push @$jobs, $job;
if (++$pos >= ($batch_size - 1)) {
- $self->_finish_get($jobs, $queue_name, $worker_id);
+ $self->_finish_get($queue_name, $jobs);
}
else {
- $self->_get_jobs_from_queue($lkey, $queue_name, $worker_id, $pos,
- $batch_size, $jobs);
+ $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs);
}
}
);
}elsif(scalar @$jobs) {
- $self->_finish_get($jobs, $queue_name, $worker_id);
+ $self->_finish_get($queue_name, $jobs);
}
else {
$self->http_error('no job', 404);
@@ -90,7 +89,10 @@ sub _update_queue_stats {
}
sub _update_worker_stats {
- my ($self, $queue_name, $worker_id, $jobs) = @_;
+ my ($self, $queue_name, $jobs) = @_;
+
+ my $input = $self->request->parameters;
+ my $worker_id = $input->{worker_id};
if ($worker_id) {
$self->application->redis->set(
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index d708760..e8cf7c8 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -3,8 +3,6 @@ package presque::RestQueueHandler;
use 5.010;
use JSON;
-use Digest::SHA;
-
use Moose;
extends 'Tatsumaki::Handler';
with
@@ -40,28 +38,23 @@ sub _fetch_job {
my ($self, $queue_name) = @_;
my $dkey = $self->_queue_delayed($queue_name);
- my $lkey = $self->_queue($queue_name);
-
- my $input = $self->request->parameters;
- my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
$self->application->redis->zrangebyscore(
$dkey, 0, time,
sub {
my $value = shift;
if ($value && scalar @$value) {
- $self->_get_job_from_delay_queue($dkey, $queue_name, $value,
- $worker_id);
+ $self->_get_job_from_delay_queue($queue_name, $dkey, $value);
}
else {
- $self->_get_job_from_queue($lkey, $queue_name);
+ $self->_get_job_from_queue($queue_name);
}
}
);
}
sub _get_job_from_delay_queue {
- my ($self, $dkey, $queue_name, $value, $worker_id) = @_;
+ my ($self, $queue_name, $dkey, $value) = @_;
my $k = shift @$value;
$self->application->redis->zrem($dkey, $k);
@@ -69,13 +62,15 @@ sub _get_job_from_delay_queue {
$k,
sub {
my $job = shift;
- $self->_finish_get($job, $queue_name, $worker_id);
+ $self->_finish_get($queue_name, $job);
}
);
}
sub _get_job_from_queue {
- my ($self, $lkey, $queue_name, $worker_id) = @_;
+ my ($self, $queue_name) = @_;
+
+ my $lkey = $self->_queue($queue_name);
$self->application->redis->lpop(
$lkey,
@@ -86,7 +81,7 @@ sub _get_job_from_queue {
$value,
sub {
my $job = shift;
- $self->_finish_get($job, $queue_name, $worker_id);
+ $self->_finish_get($queue_name, $job);
}
);
}
@@ -98,10 +93,10 @@ sub _get_job_from_queue {
}
sub _finish_get {
- my ($self, $job, $queue_name, $worker_id) = @_;
+ my ($self, $queue_name, $job) = @_;
$self->_update_queue_stats($queue_name, $job);
- $self->_update_worker_stats($queue_name, $worker_id, $job);
+ $self->_update_worker_stats($queue_name, $job);
$self->finish($job);
}
@@ -113,7 +108,10 @@ sub _update_queue_stats {
}
sub _update_worker_stats {
- my ($self, $queue_name, $worker_id) = @_;
+ my ($self, $queue_name) = @_;
+
+ my $input = $self->request->parameters;
+ my $worker_id = $input->{worker_id};
if ($worker_id) {
$self->application->redis->set(
@@ -137,9 +135,6 @@ sub _create_job {
my $delayed = $input->{delayed} if $input && $input->{delayed};
my $uniq = $input->{uniq} if $input && $input->{uniq};
- # XXX UNIQ IS BORKED
- $uniq = Digest::SHA->sha256_hex($p) if ($uniq && $uniq ~~ "1");
-
if ($uniq) {
$self->application->redis->sismember(
$self->_queue_uniq($queue_name), $uniq,
@@ -235,7 +230,7 @@ presque::RestQueueHandler
curl -H 'Content-Type: application/json' -X POST "http://localhost:5000/q/foo?delayed="$(expr `date +%s` + 500) -d '{"key":"value"}'
# fetch a job
- curl http://localhost:5000/q/foo
+ curl http://localhost:5000/q/foo
# purge and delete all jobs for a queue
curl -X DELETE http://localhost:5000/q/foo
@@ -246,13 +241,39 @@ presque::RestQueueHandler
=head2 get
+=over 4
+
+=item path
+
+/q/:queue_name
+
+=item request
+
+queue_name: [required] name of the queue to use
+
+worker_id: [optional] id of the worker, used for stats
+
+=item response
+
+If the queue is closed: 404
+
+If no job is available in the queue: 404
+
+If a job is available: 200
+
+Content-Type: application/json
+
+=back
+
+If the queue is open, a job will be fetched from the queue and send to the client
+
=head2 post
=over 4
=item path
-/q/queuename
+/q/:queue_name
=item request
@@ -264,7 +285,7 @@ query : delayed, worker_id
=item response
-code : 201
+code: 201
content : null
@@ -272,12 +293,48 @@ content : null
The B<Content-Type> of the request must be set to B<application/json>. The body of the request must be a valid JSON object.
-It iss possible to create delayed jobs (eg: job that will not be run before a defined time in the futur).
+It is possible to create delayed jobs (eg: job that will not be run before a defined time in the futur).
the B<delayed> value should be a date in epoch.
+=head2 put
+
+=over 4
+
+=item path
+
+/q/:queue_name
+
+=item request
+
+worker_id: [optional] id of the worker, used for stats
+
+=item response
+
+code: 201
+
+content: null
+
+=back
+
=head2 delete
+=over 4
+
+=item path
+
+/q/:queue_name
+
+=item request
+
+=item response
+
+code: 204
+
+content: null
+
+=back
+
Purge and delete the queue.
=head1 AUTHOR