diff options
| author | franck cuny <franck@lumberjaph.net> | 2010-04-13 18:46:54 +0200 |
|---|---|---|
| committer | franck cuny <franck@lumberjaph.net> | 2010-04-13 18:46:54 +0200 |
| commit | d18213e3c6956540e21d738b411b00e9eb71dc4e (patch) | |
| tree | 784913af63e25499c89033682b2caccfc2d000dd | |
| parent | initial commit (diff) | |
| download | presque-d18213e3c6956540e21d738b411b00e9eb71dc4e.tar.gz | |
basic REST job queue using tatsumaki + redis
| -rw-r--r-- | app.psgi | 11 | ||||
| -rw-r--r-- | conf.yaml | 4 | ||||
| -rw-r--r-- | lib/presque.pm | 38 | ||||
| -rw-r--r-- | lib/presque/Backend/Redis.pm | 0 | ||||
| -rw-r--r-- | lib/presque/IndexHandler.pm | 14 | ||||
| -rw-r--r-- | lib/presque/JobQueueHandler.pm | 13 | ||||
| -rw-r--r-- | lib/presque/RestQueueHandler.pm | 81 |
7 files changed, 159 insertions, 2 deletions
diff --git a/app.psgi b/app.psgi new file mode 100644 index 0000000..9473ea3 --- /dev/null +++ b/app.psgi @@ -0,0 +1,11 @@ +#!/usr/bin/perl +use strict; +use warnings; +use lib ('lib'); + +use presque; +use Plack::Builder; +use YAML::Syck; + +my $conf = LoadFile('conf.yaml'); +my $app = presque->app( config => $conf ); diff --git a/conf.yaml b/conf.yaml new file mode 100644 index 0000000..df54c1c --- /dev/null +++ b/conf.yaml @@ -0,0 +1,4 @@ + +redis: + host: 127.0.0.1 + port: 6379
\ No newline at end of file diff --git a/lib/presque.pm b/lib/presque.pm index 2116589..5d168d6 100644 --- a/lib/presque.pm +++ b/lib/presque.pm @@ -1,8 +1,42 @@ package presque; -use strict; -use warnings; +use Moose; our $VERSION = '0.01'; +extends 'Tatsumaki::Application'; + +use AnyEvent::Redis; + +use presque::RestQueueHandler; +use presque::JobQueueHandler; +use presque::IndexHandler; + +has config => ( + is => 'rw', isa => 'HashRef', lazy => 1, default => sub {} +); + +has redis => ( + is => 'rw', + isa => 'Object', + lazy => 1, + default => sub { + my $self = shift; + my $r = AnyEvent::Redis->new(); + $r; + } +); + +sub app { + my ( $class, %args ) = @_; + my $self = $class->new( + [ + '/q/(.*)' => 'presque::RestQueueHandler', + '/j/(.*)' => 'presque::JobQueueHandler', + '/' => 'presque::IndexHandler', + ] + ); + $self->config( delete $args{config} ); + $self; +} 1; __END__ diff --git a/lib/presque/Backend/Redis.pm b/lib/presque/Backend/Redis.pm new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/presque/Backend/Redis.pm diff --git a/lib/presque/IndexHandler.pm b/lib/presque/IndexHandler.pm new file mode 100644 index 0000000..9a8e92e --- /dev/null +++ b/lib/presque/IndexHandler.pm @@ -0,0 +1,14 @@ +package presque::IndexHandler; + +use Moose; +extends 'Tatsumaki::Handler'; +__PACKAGE__->asynchronous(1); + +use JSON; + +sub get { + my $self = shift; + # render template +} + +1; diff --git a/lib/presque/JobQueueHandler.pm b/lib/presque/JobQueueHandler.pm new file mode 100644 index 0000000..1e6d9e5 --- /dev/null +++ b/lib/presque/JobQueueHandler.pm @@ -0,0 +1,13 @@ +package presque::JobQueueHandler; + +use Moose; +extends 'Tatsumaki::Handler'; +__PACKAGE__->asynchronous(1); + +use JSON; + +sub get { + my ($self, $queue_name) = @_; +} + +1; diff --git a/lib/presque/RestQueueHandler.pm b/lib/presque/RestQueueHandler.pm new file mode 100644 index 0000000..d953505 --- /dev/null +++ b/lib/presque/RestQueueHandler.pm @@ -0,0 +1,81 @@ +package presque::RestQueueHandler; + +use Moose; +extends 'Tatsumaki::Handler'; +__PACKAGE__->asynchronous(1); + +use JSON; +use YAML::Syck; + +sub get { + my ( $self, $queue_name ) = @_; + my $lkey = $queue_name . ':queue'; + $self->application->redis->lpop( + $lkey, + sub { + my $value = shift; + my $qpkey = $queue_name . ':queupolicy'; + if ($value) { + my $val = $self->application->redis->get( + $value, + sub { + $self->finish(shift); + } + ); + }else{ + $self->finish(JSON::encode_json({error => "no job"})); + } + } + ); +} + +sub post { + my ( $self, $queue_name ) = @_; + + my $p = $self->request->content; + + $self->application->redis->incr( + $queue_name . ':UUID', + sub { + my $uuid = shift; + my $key = $queue_name . ':' . $uuid; + + $self->application->redis->set( + $key, $p, + sub { + my $status_set = shift; + my $lkey = $queue_name . ':queue'; + if ($uuid == 1) { + $self->application->redis->sadd( + 'QUEUESET', + $lkey, + sub { + my $ckey = 'queuestat:' . $queue_name; + $self->application->redis->set( $ckey, 1 ); + $self->_finish_post($lkey, $key, $status_set); + } + ); + }else{ + $self->_finish_post($lkey, $key, $status_set); + } + } + ); + } + ); +} + +sub _finish_post { + my ($self, $lkey, $key, $result) = @_; + $self->application->redis->rpush( + $lkey, $key, + sub { + $self->finish($result); + } + ); +} + +sub delete { + my ($self, $queue_name) = @_; +} + +1; |
