summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-05-13 15:14:38 +0200
committerfranck cuny <franck@lumberjaph.net>2010-05-13 15:14:38 +0200
commit92cf580a6bdde758208ae26bf11e9f90bc78c66b (patch)
tree4f1f1deb5ae1fcd1cc7c8845274aece0196c3988 /lib
parentcleanup, register what a worker is doing (diff)
downloadpresque-92cf580a6bdde758208ae26bf11e9f90bc78c66b.tar.gz
more cleaning, add stat on processed jobs
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/RestQueueHandler.pm95
1 files changed, 40 insertions, 55 deletions
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 3672d6f..65f4a44 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -38,15 +38,9 @@ sub get {
$self->application->redis->get(
$k,
sub {
- $self->application->redis->set(
- $self->_queue_worker($worker_name),
- JSON::encode_json(
- { queue => $queue_name,
- run_at => time()
- }
- )
- ) if $worker_name;
- $self->finish(shift);
+ my $job = shift;
+ $self->_finish_get($job, $queue_name,
+ $worker_name);
}
);
}
@@ -55,21 +49,13 @@ sub get {
$lkey,
sub {
my $value = shift;
- my $qpkey = $self->_queue_policy($queue_name);
if ($value) {
$self->application->redis->get(
$value,
sub {
- $self->application->redis->set(
- $self->_queue_worker(
- $worker_name),
- JSON::encode_json(
- { queue => $queue_name,
- run_at => time()
- }
- )
- ) if $worker_name;
- $self->finish(shift);
+ my $job = shift;
+ $self->_finish_get($job,
+ $queue_name, $worker_name);
}
);
}
@@ -86,15 +72,15 @@ sub get {
}
sub post {
- my ( $self, $queue_name ) = @_;
+ my ($self, $queue_name) = @_;
- return $self->http_error_queue if ( !$queue_name );
+ return $self->http_error_queue if (!$queue_name);
return $self->http_error_content_type
if (!$self->request->header('Content-Type')
- || $self->request->header('Content-Type') ne 'application/json' );
+ || $self->request->header('Content-Type') ne 'application/json');
- my $input = $self->request->parameters;
+ my $input = $self->request->parameters;
my $delayed = $input->{delayed};
my $p = $self->request->content;
@@ -103,24 +89,20 @@ sub post {
$self->_queue_uuid($queue_name),
sub {
my $uuid = shift;
- my $key = $self->_queue_key($queue_name, $uuid);
+ my $key = $self->_queue_key($queue_name, $uuid);
$self->application->redis->set(
$key, $p,
sub {
my $status_set = shift;
my $lkey = $self->_queue($queue_name);
- if ( $uuid == 1 ) {
+ if ($uuid == 1) {
$self->application->redis->sadd('QUEUESET', $lkey);
my $ckey = $self->_queue_stat($queue_name);
$self->application->redis->set($ckey, 1);
- $self->_finish_post( $lkey, $key, $status_set,
- $delayed, $queue_name );
- }
- else {
- $self->_finish_post( $lkey, $key, $status_set,
- $delayed, $queue_name );
}
+ $self->_finish_post($lkey, $key, $status_set, $delayed,
+ $queue_name);
}
);
}
@@ -128,30 +110,36 @@ sub post {
}
sub delete {
- my ( $self, $queue_name ) = @_;
+ my ($self, $queue_name) = @_;
- return $self->http_error_queue if ( !$queue_name );
+ return $self->http_error_queue if (!$queue_name);
# delete delayed queue
my $lkey = $self->_queue($queue_name);
my $dkey = $self->_queue_delayed($queue_name);
- $self->application->redis->del(
- $lkey,
- sub {
- my $res = shift;
- $self->application->redis->del(
- $dkey,
- sub {
- $self->finish(
- JSON::encode_json(
- { queue => $queue_name, status => $res }
- )
- );
+ $self->application->redis->del($lkey);
+ $self->application->redis->del($dkey);
+ $self->response->code(204);
+ $self->finish();
+}
+
+sub _finish_get {
+ my ($self, $job, $queue_name, $worker_name) = @_;
+
+ $self->application->redis->incr('processed');
+ if ($worker_name) {
+ $self->application->redis->set(
+ $self->_queue_worker($worker_name),
+ JSON::encode_json(
+ { queue => $queue_name,
+ run_at => time()
}
- );
- }
- );
+ )
+ );
+ $self->application->redis->incr('processed:' . $worker_name);
+ }
+ $self->finish($job);
}
sub _finish_post {
@@ -164,12 +152,9 @@ sub _finish_post {
@args = ($queue_name . ':delayed', $delayed, $key);
}
- $self->application->redis->$method(
- @args,
- sub {
- $self->finish({status => 'success'});
- }
- );
+ $self->application->redis->$method(@args,);
+ $self->response->code(204);
+ $self->finish();
}
1;