summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-07-01 09:31:58 +0200
committerfranck cuny <franck@lumberjaph.net>2010-07-01 09:31:58 +0200
commit2b0e4e2250d24bc93ac91d9e799d448cca20aa51 (patch)
tree0483a93b89cdd31364964d8c9bb824a75c9bee54
parentfrom a method (diff)
downloadpresque-2b0e4e2250d24bc93ac91d9e799d448cca20aa51.tar.gz
update on stats
-rw-r--r--lib/presque/RestQueueBatchHandler.pm15
-rw-r--r--lib/presque/RestQueueHandler.pm84
2 files changed, 39 insertions, 60 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 281b117..ef37045 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -89,8 +89,8 @@ sub _get_jobs_from_queue {
sub _update_queue_stats {
my ($self, $queue_name, $jobs) = @_;
- $self->application->redis->incrby('processed', scalar @$jobs);
- $self->application->redis->incrby($self->_queue_processed($queue_name), scalar @$jobs);
+ $self->application->redis->hincrby($self->_queue_processed, $queue_name,
+ scalar @$jobs);
}
sub _update_worker_stats {
@@ -100,15 +100,8 @@ sub _update_worker_stats {
my $worker_id = $input->{worker_id};
if ($worker_id) {
- $self->application->redis->set(
- $self->_queue_worker($worker_id),
- JSON::encode_json(
- { queue => $queue_name,
- run_at => time()
- }
- )
- );
- $self->application->redis->incrby('processed:' . $worker_id, scalar @$jobs);
+ $self->application->redis->hincrby($self->_workers_processed,
+ $worker_id, @$jobs);
}
}
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 650f3bc..75c821a 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -43,7 +43,7 @@ sub _fetch_job {
$dkey, 0, time,
sub {
my $value = shift;
- if ($value && scalar @$value) {
+ if ($value && ref $value && scalar @$value) {
$self->_get_job_from_delay_queue($queue_name, $dkey, $value);
}
else {
@@ -104,28 +104,19 @@ sub _finish_get {
sub _remove_from_uniq {
my ($self, $queue_name, $key) = @_;
- my @keys;
- if (ref $key) {
- @keys = map {
- $self->_queue_uniq($queue_name, $_)
- } grep {
- defined $_;
- } @$key;
- }
- else {
- push @keys, $self->_queue_uniq($queue_name, $key);
- }
-
- $self->application->redis->mget(
+ my @keys = (ref $key) ? @$key : ($key);
+ $self->application->redis->hmget(
+ $self->_queue_uniq_revert($queue_name),
@keys,
sub {
my $value = shift;
for my $i (0 .. (@$value - 1)) {
if (my $key = $value->[$i]) {
- $self->application->redis->del(
- $self->_queue_uniq($queue_name, $key));
- $self->application->redis->del(
- $self->_queue_uniq($queue_name, $keys[$i]));
+ $self->application->redis->hdel(
+ $self->_queue_uniq($queue_name), $key);
+ $self->application->redis->hdel(
+ $self->_queue_uniq_revert($queue_name),
+ $keys[$i]);
}
}
}
@@ -135,8 +126,8 @@ sub _remove_from_uniq {
sub _update_queue_stats {
my ($self, $queue_name) = @_;
- $self->application->redis->incr('processed');
- $self->application->redis->incr($self->_queue_processed($queue_name));
+ $self->application->redis->hincrby($self->_queue_processed, $queue_name,
+ 1);
}
sub _update_worker_stats {
@@ -146,15 +137,8 @@ sub _update_worker_stats {
my $worker_id = $input->{worker_id};
if ($worker_id) {
- $self->application->redis->set(
- $self->_queue_worker($worker_id),
- JSON::encode_json(
- { queue => $queue_name,
- run_at => time()
- }
- )
- );
- $self->application->redis->incr('processed:' . $worker_id);
+ $self->application->redis->hincrby($self->_workers_processed,
+ $worker_id, 1);
}
}
@@ -164,19 +148,20 @@ sub _create_job {
my $p = $self->request->content;
my $input = $self->request->parameters;
- my $delayed = $input->{delayed} if $input && $input->{delayed};
- my $uniq = $input->{uniq} if $input && $input->{uniq};
+ my $delayed = ($input && $input->{delayed}) ? $input->{delayed} : undef;
+ my $uniq = ($input && $input->{uniq}) ? $input->{uniq} : undef;
if ($uniq) {
- $self->application->redis->get(
- $self->_queue_uniq($queue_name, $uniq),
+ $self->application->redis->hexists(
+ $self->_queue_uniq($queue_name),
+ $uniq,
sub {
my $status = shift;
- if ($status) {
- $self->http_error('job already exists');
+ if ($status == 0) {
+ $self->_insert_to_queue($queue_name, $p, $delayed, $uniq);
}
else {
- $self->_insert_to_queue($queue_name, $p, $delayed, $uniq);
+ $self->http_error('job already exists');
}
}
);
@@ -202,13 +187,11 @@ sub _insert_to_queue {
my $lkey = $self->_queue($queue_name);
$self->new_queue($queue_name, $lkey) if ($uuid == 1);
if ($uniq) {
- $self->application->redis->set(
- $self->_queue_uniq($queue_name, $uniq), $key);
- $self->application->redis->set(
- $self->_queue_uniq($queue_name, $key), $uniq);
+ $self->application->redis->hset($self->_queue_uniq($queue_name), $uniq, $key);
+ $self->application->redis->hset($self->_queue_uniq_revert($queue_name), $key, $uniq);
}
$self->_finish_post($lkey, $key, $status_set, $delayed,
- $queue_name);
+ $queue_name);
}
);
}
@@ -221,9 +204,11 @@ sub _failed_job {
my $input = $self->request->parameters;
my $worker_id = $input->{worker_id} if $input && $input->{worker_id};
- $self->application->redis->incr('failed');
- $self->application->redis->incr($self->_queue_failed($queue_name));
- $self->application->redis->incr('failed:' . $worker_id) if $worker_id;
+ $self->application->redis->hincrby($self->_queue_failed, $queue_name, 1);
+
+ if ($worker_id) {
+ $self->application->redis->hincrby($self->_workers_failed($worker_id), 1);
+ }
$self->_create_job($queue_name);
}
@@ -231,13 +216,14 @@ sub _failed_job {
sub _purge_queue {
my ($self, $queue_name) = @_;
- my $lkey = $self->_queue($queue_name);
- my $dkey = $self->_queue_delayed($queue_name);
-
- $self->application->redis->del($lkey);
- $self->application->redis->del($dkey);
+ $self->application->redis->del($self->_queue($queue_name));
+ $self->application->redis->del($self->_queue_delayed($queue_name));
$self->application->redis->del($self->_queue_failed($queue_name));
$self->application->redis->del($self->_queue_processed($queue_name));
+ $self->application->redis->del($self->_queue_uniq($queue_name));
+ $self->application->redis->del($self->_queue_uniq_revert($queue_name));
+ $self->application->redis->hdel($self->_queue_processed, $queue_name);
+ $self->application->redis->hdel($self->_queue_failed, $queue_name);
$self->response->code(204);
$self->finish();
}