summaryrefslogblamecommitdiff
path: root/lib/presque/worker.pm
blob: 77115c177e0a23d823adc2c096142c668243583d (plain) (tree)
1
2
3
4
5
6
7
8
9

                        

                      


              
                           
 

































                                                                         
 



























                                                    
 

                          
 

                          
     









                                                                   
      

 




           
                                  


               












                                      


                  





                                                                
                                                                                                           


                                             
















                                                                                                                           








                                           



                             



                                                                    
package presque::worker;

our $VERSION = '0.01';

use Carp;
use JSON;
use Try::Tiny;
use presque::worker::Queue;

use Moose;
with qw/
  presque::worker::Role::Management
  presque::worker::Role::Fork
  presque::worker::Role::RESTClient
  presque::worker::Role::Logger/;

has queue_name => (is => 'ro', isa => 'Str', required => 1);
has retries    => (is => 'rw', isa => 'Int', default  => 5);
has interval => (is => 'ro', isa => 'Int', lazy => 1, default => 1);
has _fail_method => (
    is        => 'rw',
    isa       => 'Bool',
    lazy      => 1,
    default   => 0,
    predicate => '_has_fail_method'
);
has queue => (
    is   => 'ro',
    isa  => 'Object',
    lazy => 1,
    default =>
      sub { presque::worker::Queue->new(base_uri => (shift)->base_uri); }
);
has worker_id => (
    is       => 'ro',
    isa      => 'Str',
    required => 1,
    default  => sub {
        my $self = shift;
        my $name = $self->meta->name . '_' . $$;
        $name;
    }
);

before start => sub {
    my $self = shift;
    if (!$self->meta->find_method_by_name('work')) {
        Carp::confess "method 'work' is missing";
    }
    if ($self->meta->find_method_by_name('fail')) {
        $self->fail_method(1);
    }
};

sub start {
    my $self = shift;

    $self->logger->log(
        level   => 'info',
        message => "presque worker ["
          . $self->worker_id
          . "] : start to listen for "
          . $self->queue_name
    );

    while (!$self->shut_down) {
        my $job = $self->rest_fetch_job();
        $self->work_once($job) if $job;
        sleep($self->interval);
    }
    return $self;
}

sub work_once {
    my ($self, $job) = @_;

    try {
        $self->work($job);
    }
    catch {
        my $err = $_;
        $self->logger->log(
            level   => 'error',
            message => 'Job failed: ' . $err,
        );
        push @{$job->{fail}}, $err;
        my $retries = ($job->{retries_left} || $self->retries) - 1;
        $self->rest_retry_job($job) if $retries > 0;
        $self->fail($job, $_) if $self->_has_fail_method;
    };
}

1;
__END__

=head1 NAME

presque::worker - a presque worker

=head1 SYNOPSIS

    package myworker;
    use Moose;
    extends 'presque::worker';

    sub work {
        my ($self, $job) = @_;
        ...
    }

    sub fail {
        my ($self, $job, $error) = @_;
        ...
    }

=head1 DESCRIPTION

presque::worker - Worker for the C<presque> message queue system

=head1 METHODS

=head2 work ($job_description)

Worker must implement the B<work> method. The only argument of this method is a hashref containing the job.

=head2 fail ($job_description, $error_reason)

Worker may implement the B<fail> method. This method have two arguments: the job description and the reason of the failure.

=head1 ATTRIBUTES

=head2 queue_name

=head2 base_uri

=head2 worker_id

=head2 retries

=head2 interval

=head2

The url of the presque webservices.

=head1 AUTHOR

franck cuny E<lt>franck@lumberjaph.netE<gt>

=head1 SEE ALSO

=head1 LICENSE

Copyright 2010 by Linkfluence

L<http://linkfluence.net>

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut