summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/presque/RestQueueBatchHandler.pm21
-rw-r--r--lib/presque/RestQueueHandler.pm57
2 files changed, 61 insertions, 17 deletions
diff --git a/lib/presque/RestQueueBatchHandler.pm b/lib/presque/RestQueueBatchHandler.pm
index 30008b8..e25d570 100644
--- a/lib/presque/RestQueueBatchHandler.pm
+++ b/lib/presque/RestQueueBatchHandler.pm
@@ -16,7 +16,7 @@ sub _fetch_job {
my $input = $self->request->parameters;
my $batch_size =
- ($input && $input->{batch_size}) ? $input->{batch_size} : 50;
+ ($input && $input->{batch_size}) ? $input->{batch_size} : 10;
$self->application->redis->zrangebyscore(
$dkey, 0, time,
@@ -26,7 +26,7 @@ sub _fetch_job {
$self->_get_jobs_from_delay_queue($queue_name, $dkey, $values, $batch_size);
}
else {
- $self->_get_jobs_from_queue($queue_name, 0, $batch_size, []);
+ $self->_get_jobs_from_queue($queue_name, 0, $batch_size, [], []);
}
}
);
@@ -43,13 +43,13 @@ sub _get_jobs_from_delay_queue {
@keys,
sub {
my $jobs = shift;
- $self->_finish_get($queue_name, $jobs);
+ $self->_finish_get($queue_name, $jobs, \@keys);
}
);
}
sub _get_jobs_from_queue {
- my ($self, $queue_name, $pos, $batch_size, $jobs) = @_;
+ my ($self, $queue_name, $pos, $batch_size, $jobs, $keys) = @_;
my $lkey = $self->_queue($queue_name);
@@ -62,17 +62,22 @@ sub _get_jobs_from_queue {
$value,
sub {
my $job = shift;
+ push @$keys, $value;
push @$jobs, $job;
if (++$pos >= ($batch_size - 1)) {
- $self->_finish_get($queue_name, $jobs);
+ $self->_finish_get($queue_name, $jobs, $keys);
}
else {
- $self->_get_jobs_from_queue($queue_name, $pos, $batch_size, $jobs);
+ $self->_get_jobs_from_queue(
+ $queue_name, $pos, $batch_size,
+ $jobs, $keys
+ );
}
}
);
- }elsif(scalar @$jobs) {
- $self->_finish_get($queue_name, $jobs);
+ }
+ elsif (scalar @$jobs) {
+ $self->_finish_get($queue_name, $jobs, $keys);
}
else {
$self->http_error('no job', 404);
diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm
index e8cf7c8..d47751c 100644
--- a/lib/presque/RestQueueHandler.pm
+++ b/lib/presque/RestQueueHandler.pm
@@ -62,7 +62,7 @@ sub _get_job_from_delay_queue {
$k,
sub {
my $job = shift;
- $self->_finish_get($queue_name, $job);
+ $self->_finish_get($queue_name, $job, $k);
}
);
}
@@ -81,7 +81,7 @@ sub _get_job_from_queue {
$value,
sub {
my $job = shift;
- $self->_finish_get($queue_name, $job);
+ $self->_finish_get($queue_name, $job, $value);
}
);
}
@@ -93,13 +93,45 @@ sub _get_job_from_queue {
}
sub _finish_get {
- my ($self, $queue_name, $job) = @_;
+ my ($self, $queue_name, $job, $key) = @_;
+ $self->_remove_from_uniq($queue_name, $key);
$self->_update_queue_stats($queue_name, $job);
$self->_update_worker_stats($queue_name, $job);
$self->finish($job);
}
+sub _remove_from_uniq {
+ my ($self, $queue_name, $key) = @_;
+
+ my @keys;
+ if (ref $key) {
+ @keys = map {
+ $self->_queue_uniq($queue_name, $_)
+ } grep {
+ defined $_;
+ } @$key;
+ }
+ else {
+ push @keys, $self->_queue_uniq($queue_name, $key);
+ }
+
+ $self->application->redis->mget(
+ @keys,
+ sub {
+ my $value = shift;
+ for my $i (0 .. (@$value - 1)) {
+ if (my $key = $value->[$i]) {
+ $self->application->redis->del(
+ $self->_queue_uniq($queue_name, $key));
+ $self->application->redis->del(
+ $self->_queue_uniq($queue_name, $keys[$i]));
+ }
+ }
+ }
+ );
+}
+
sub _update_queue_stats {
my ($self, $queue_name) = @_;
@@ -136,8 +168,8 @@ sub _create_job {
my $uniq = $input->{uniq} if $input && $input->{uniq};
if ($uniq) {
- $self->application->redis->sismember(
- $self->_queue_uniq($queue_name), $uniq,
+ $self->application->redis->get(
+ $self->_queue_uniq($queue_name, $uniq),
sub {
my $status = shift;
if ($status) {
@@ -169,12 +201,15 @@ sub _insert_to_queue {
my $status_set = shift;
my $lkey = $self->_queue($queue_name);
$self->new_queue($queue_name, $lkey) if ($uuid == 1);
- $self->application->redis->zadd(
- $self->_queue_uniq($queue_name), $uniq)
- if $uniq;
+ if ($uniq) {
+ $self->application->redis->set(
+ $self->_queue_uniq($queue_name, $uniq), $key);
+ $self->application->redis->set(
+ $self->_queue_uniq($queue_name, $key), $uniq);
+ }
$self->_finish_post($lkey, $key, $status_set, $delayed,
$queue_name);
- }
+ }
);
}
);
@@ -283,6 +318,10 @@ content : JSON object
query : delayed, worker_id
+delay : after which date (in epoch) this job should be run
+
+uniq : this job is uniq. The value is the string that will be used to determined if the job is uniq
+
=item response
code: 201