summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorfranck cuny <franck@lumberjaph.net>2010-07-08 09:51:57 +0200
committerfranck cuny <franck@lumberjaph.net>2010-07-08 09:51:57 +0200
commit8cd8a1173b294f84f55acb5a4e9e0e4a2d3b389b (patch)
treeff3cc72167f2c2e842d53f984c25379a13a97f14 /lib
parentload new service (diff)
downloadpresque-8cd8a1173b294f84f55acb5a4e9e0e4a2d3b389b.tar.gz
don't look for job in delay queues
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/RestQueueBatchHandler.pm37
-rw-r--r--lib/presque/RestQueueHandler.pm59
-rw-r--r--lib/presque/Role/Queue.pm2
3 files changed, 26 insertions, 72 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 311f223..abf36c5 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -12,44 +12,13 @@ sub delete { (shift)->htttp_error('DELETE is not supported in batch mode'); }
sub _fetch_job {
my ($self, $queue_name) = @_;
- my $dkey = $self->_queue_delayed($queue_name);
-
my $input = $self->request->parameters;
my $batch_size =
($input && $input->{batch_size}) ? $input->{batch_size} : 10;
- $self->application->redis->zrangebyscore(
- $dkey, 0, time,
- sub {
- my $values = shift;
- if ($values && scalar @$values) {
- $self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
- }
- else {
- $self->_get_jobs_from_queue($queue_name, 0, $batch_size, [], []);
- }
- }
- );
-}
-
-sub _get_jobs_from_delay_queue {
- my ($self, $queue_name, $dkey, $values, $batch_size) = @_;
-
- my @keys = @$values[0 .. ($batch_size - 1)];
- foreach (@keys) {
- $self->application->redis->zrem($dkey, $_);
- }
- $self->application->redis->mget(
- @keys,
- sub {
- my $jobs = shift;
- $self->_finish_get($queue_name, $jobs, \@keys);
- }
- );
-}
-
-sub _get_jobs_from_queue {
- my ($self, $queue_name, $pos, $batch_size, $jobs, $keys) = @_;
+ my $jobs = [];
+ my $keys = [];
+ my $pos = 0;
my $lkey = $self->_queue($queue_name);
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index 0f919cd..cdcca1f 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -37,40 +37,6 @@ sub _is_queue_opened {
sub _fetch_job {
my ($self, $queue_name) = @_;
- my $dkey = $self->_queue_delayed($queue_name);
-
- $self->application->redis->zrangebyscore(
- $dkey, 0, time,
- sub {
- my $value = shift;
- if ($value && ref $value && scalar @$value) {
- $self->_get_job_from_delay_queue($queue_name, $dkey, $value);
- }
- else {
- $self->_get_job_from_queue($queue_name);
- }
- }
- );
-}
-
-sub _get_job_from_delay_queue {
- my ($self, $queue_name, $dkey, $value) = @_;
-
- my $k = shift @$value;
- $self->application->redis->zrem($dkey, $k);
- $self->application->redis->get(
- $k,
- sub {
- my $job = shift;
- $self->application->redis->del($k);
- $self->_finish_get($queue_name, $job, $k);
- }
- );
-}
-
-sub _get_job_from_queue {
- my ($self, $queue_name) = @_;
-
my $lkey = $self->_queue($queue_name);
$self->application->redis->lpop(
@@ -217,14 +183,33 @@ sub _failed_job {
sub _purge_queue {
my ($self, $queue_name) = @_;
- $self->application->redis->del($self->_queue($queue_name));
+ # supprimer tous les jobs
+
+ $self->application->redis->llen(
+ $self->_queue($queue_name),
+ sub {
+ my $size = shift;
+ $self->application->redis->lrange(
+ $self->_queue($queue_name),
+ 0, $size,
+ sub {
+ my $jobs = shift;
+ foreach my $j (@$jobs) {
+ $self->application->redis->del($j);
+ }
+ $self->application->redis->del($self->_queue($queue_name));
+ }
+ );
+ }
+ );
+
+
$self->application->redis->del($self->_queue_delayed($queue_name));
- $self->application->redis->del($self->_queue_failed($queue_name));
- $self->application->redis->del($self->_queue_processed($queue_name));
$self->application->redis->del($self->_queue_uniq($queue_name));
$self->application->redis->del($self->_queue_uniq_revert($queue_name));
$self->application->redis->hdel($self->_queue_processed, $queue_name);
$self->application->redis->hdel($self->_queue_failed, $queue_name);
+
$self->response->code(204);
$self->finish();
}
diff --git a/lib/presque/Role/Queue.pm b/lib/presque/Role/Queue.pm
index fce83e2..a1b82ee 100644
--- a/lib/presque/Role/Queue.pm
+++ b/lib/presque/Role/Queue.pm
@@ -4,7 +4,7 @@ use Moose::Role;
sub new_queue {
my ($self, $queue_name, $lkey) = @_;
- $self->application->redis->sadd('QUEUESET', $lkey);
+ $self->application->redis->sadd('QUEUESET', $queue_name);
my $ckey = $self->_queue_stat($queue_name);
$self->application->redis->set($ckey, 1);
}