summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/presque/ControlHandler.pm10
-rw-r--r--lib/presque/Role/Queue.pm10
-rw-r--r--lib/presque/Role/Queue/Names.pm5
3 files changed, 21 insertions, 4 deletions
diff --git a/lib/presque/ControlHandler.pm b/lib/presque/ControlHandler.pm
index 7feadc8..38bbe89 100644
--- a/lib/presque/ControlHandler.pm
+++ b/lib/presque/ControlHandler.pm
@@ -15,13 +15,15 @@ __PACKAGE__->asynchronous(1);
sub get {
my ($self, $queue_name) = @_;
- $self->application->redis->get(
+ $self->application->redis->mget(
$self->_queue_stat($queue_name),
+ $self->_queue_delayed_next($queue_name),
sub {
- my $status = shift;
+ my $res = shift;
$self->entity(
- { queue => $queue_name,
- status => $status
+ { queue => $queue_name,
+ status => $res->[0],
+ next_run_after => $res->[1],
}
);
}
diff --git a/lib/presque/Role/Queue.pm b/lib/presque/Role/Queue.pm
index c7b50f4..6ac0e8a 100644
--- a/lib/presque/Role/Queue.pm
+++ b/lib/presque/Role/Queue.pm
@@ -16,6 +16,16 @@ sub push_job {
if ($delayed) {
$method = 'zadd';
@args = ($queue_name . ':delayed', $delayed, $key);
+ $self->application->redis->get(
+ $self->_queue_delayed_next($queue_name),
+ sub {
+ my $val = shift;
+ if (!$val || ($val && $val > $delayed)) {
+ $self->application->redis->set(
+ $self->_queue_delayed_next($queue_name), $delayed);
+ }
+ }
+ );
}
$self->application->redis->$method(@args,);
}
diff --git a/lib/presque/Role/Queue/Names.pm b/lib/presque/Role/Queue/Names.pm
index 6a2b6a7..c371a50 100644
--- a/lib/presque/Role/Queue/Names.pm
+++ b/lib/presque/Role/Queue/Names.pm
@@ -12,6 +12,11 @@ sub _queue_delayed {
return $queue_name.':delayed';
}
+sub _queue_delayed_next {
+ my ($self, $queue_name) = @_;
+ return $queue_name.':delayed:next';
+}
+
sub _queue_policy {
my ($self, $queue_name) = @_;
return $queue_name.':queuepolicy';