From b748fb0e5c93b8868ae65b1c3e445dab8afc5442 Mon Sep 17 00:00:00 2001 From: franck cuny Date: Thu, 8 Jul 2010 09:46:56 +0200 Subject: this service will periodicaly check if any job is in delay queue, and move them to standard queue if they must be processed now --- lib/presque/Service.pm | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 lib/presque/Service.pm diff --git a/lib/presque/Service.pm b/lib/presque/Service.pm new file mode 100644 index 0000000..fd18ace --- /dev/null +++ b/lib/presque/Service.pm @@ -0,0 +1,43 @@ +package presque::Service; + +use Moose; +extends 'Tatsumaki::Service'; +with 'presque::Role::Queue::Names'; + +has redis => (is => 'rw', isa => 'Object', required => 1); + +sub start { + my $self = shift; + my $t; + $t = AE::timer 0, 1, sub { + scalar $t; + $self->redis->smembers( + 'QUEUESET', + sub { + my $queues = shift; + foreach my $q (@$queues) { + $self->_check_delayed_queue($q); + } + } + ); + }; +} + +sub _check_delayed_queue { + my ($self, $queue_name) = @_; + + my $dkey = $self->_queue_delayed($queue_name); + + $self->redis->zrangebyscore( + $dkey, 0, time, + sub { + my $keys = shift; + foreach my $k (@$keys) { + $self->redis->zrem($dkey, $k); + $self->redis->lpush($self->_queue($queue_name), $k); + } + } + ); +} + +1; -- cgit v1.2.3