summaryrefslogblamecommitdiff
path: root/lib/AnyEvent/Riak.pm
blob: ad90b839d0b087d126f6dee5132ca8c7324b549a (plain) (tree)
1
2
3
4
5
6
7
8
9

                       
           
             
 
         
         

                   

                 
 




                                 
 
                           
 
                      
 











                                                                              




                                           

                 
                                                              
                                                           
             


                                            

                  
                                    
             


               


                 
                                            
                                           

                 
                                 

                                          
          
                                                           
             
 

                                                     
                                                   
                                       




                                 

               

 
                
                                                
 
                                           
 
                 


                                                                        

                                              


                                            

                  
                                    
             





           
                                             
 
                                           
 
                 
                                 

                                           
          
                                                           
             

                                                     



                                                           
             


         

 
           
                                                      
 
                                           
 
                                          

                 
                                  

                                           
          
                                                           
                         
             



                                                               

                  

                            
                                      



         
 
            
                                             
 
                                           
 
                 
                                    

                                           
          
                                                           
             
                                 



         
 






                                                                        
  
 



           
                                         


               
                       
 
                                   

                                        


                                                           
 
                                                                         

                                                                                                    
 


                                                                

                  
                                                                                                                                                             
 



              
                                                       
 

                                                                            
 
                                     
 
                                                                       
 
                                                                  
 














                                                                          

                                    
                       
                             





                                                
         
      
 
                                                                                         
 
                                                 
       
 
     
 
                                                         
 
     
 
                                                                                
 
     
 
                                           
 
                                                   
 


                                                                              
 
                                                                                                
 
                                                                                    
 

     

             
                                           




               
                                    


                         



                                                                    
package AnyEvent::Riak;

use strict;
use warnings;

use Carp;
use JSON;
use AnyEvent;
use AnyEvent::HTTP;
use MIME::Base64;
use YAML::Syck;

use Moose;
with qw/
  AnyEvent::Riak::Role::CVCB
  AnyEvent::Riak::Role::HTTPUtils
  /;

use AnyEvent::Riak::Bucket;

our $VERSION = '0.02';

has host => (is => 'rw', isa => 'Str', default => 'http://127.0.0.1:8098');
has path => (is => 'rw', isa => 'Str', default => 'riak');
has mapred_path => (is => 'rw', isa => 'Str', default => 'mapred');
has r           => (is => 'rw', isa => 'Int', default => 2);
has w           => (is => 'rw', isa => 'Int', default => 2);
has dw          => (is => 'rw', isa => 'Int', default => 2);
has client_id   => (
    is  => 'rw',
    isa => 'Str',
    default =>
      sub { "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), '') }
);

sub is_alive {
    my ($self, %options) = @_;

    my ($cv, $cb) = $self->cvcb(\%options);

    http_request(
        GET     => $self->_build_uri($self->host, [qw/ping/]),
        headers => $self->_build_headers($options{params}),
        sub {
            my ($body, $headers) = @_;
            if ($headers->{Status} == 200) {
                $cv->send($cb->(1));
            }
            else {
                $cv->send($cb->(0));
            }
        },
    );
    return $cv;
}

sub list_bucket {
    my ($self, $bucket_name, %options) = @_;
    my ($cv, $cb) = $self->cvcb(\%options);

    http_request(
        GET => $self->_build_uri(
            [$self->{path}, $bucket_name],
            $options{params}
        ),
        headers => $self->_build_headers($options{params}),
        sub {

            my ($body, $headers) = @_;
            if ($body && $headers->{Status} == 200) {
                my $res = JSON::decode_json($body);
                $cv->send($cb->($res));
            }
            else {
                $cv->send(undef);
            }
        }
    );
    return $cv;
}

sub set_bucket {
    my ($self, $bucket, $schema, %options) = @_;

    my ($cv, $cb) = $self->cvcb(\%options);

    http_request(
        PUT =>
          $self->_build_uri([$self->{path}, $bucket], $options{params}),
        headers => $self->_build_headers($options{params}),
        body    => JSON::encode_json($schema),
        sub {
            my ($body, $headers) = @_;
            if ($headers->{Status} == 204) {
                $cv->send($cb->(1));
            }
            else {
                $cv->send($cb->(0));
            }
        }
    );
    $cv;
}

sub fetch {
    my ($self, $bucket, $key, %options) = @_;

    my ($cv, $cb) = $self->cvcb(\%options);

    http_request(
        GET => $self->_build_uri(
            [$self->{path}, $bucket, $key],
            $options{params}
        ),
        headers => $self->_build_headers($options{params}),
        sub {
            my ($body, $headers) = @_;
            if ($body && $headers->{Status} == 200) {
                $cv->send($cb->(JSON::decode_json($body)));
            }
            else {
                $cv->send($cb->(0));
            }
        }
    );
    $cv;
}

sub store {
    my ($self, $bucket, $key, $object, %options) = @_;

    my ($cv, $cb) = $self->cvcb(\%options);

    my $json = JSON::encode_json($object);

    http_request(
        POST => $self->_build_uri(
            [$self->{path}, $bucket, $key],
            $options{params}
        ),
        headers => $self->_build_headers($options{params}),
        body    => $json,
        sub {
            my ($body, $headers) = @_;
            my $result;
            if ($headers->{Status} == 204) {
                $result = $body ? JSON::decode_json($body) : 1;
            }
            else {
                $result = 0;
            }
            $cv->send($cb->($result));
        }
    );
    $cv;
}

sub delete {
    my ($self, $bucket, $key, %options) = @_;

    my ($cv, $cb) = $self->cvcb(\%options);

    http_request(
        DELETE => $self->_build_uri(
            [$self->{path}, $bucket, $key],
            $options{params}
        ),
        headers => $self->_build_headers($options{params}),
        sub {
            $cv->send($cb->(@_));
        }
    );
    $cv;
}

sub bucket {
    my ($self, $name) = @_;
    return AnyEvent::Riak::Bucket->new(name => $name, _client => $self);
}

no Moose;

1;

__END__

=head1 NAME

AnyEvent::Riak - Non-blocking Riak client

=head1 SYNOPSIS

    use AnyEvent::Riak;

    my $riak = AnyEvent::Riak->new(
        host => 'http://127.0.0.1:8098',
        path => 'riak',
    );

    die "Riak is not running" unless $riak->is_alive->recv;

    my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv;

This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.

For a complete description of the Riak REST API, please refer to
L<https://wiki.basho.com/display/RIAK/REST+API>.

=head1 DESCRIPTION

AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.

=head2 METHODS

=over 4

=item B<is_alive>([callback => sub { }, params => { }])

Check if the Riak server is alive. If the ping is successful, 1 is returned,
else 0.

    my $ping = $riak->is_alive->recv;

=item B<list_bucket>($bucketname, [callback => sub { }, params => { }])

Get the schema and key list for 'bucket'. Possible parameters are:

=over 2

=item

props=[true|false] - whether to return the bucket properties

=item

keys=[true|false|stream] - whether to return the keys stored in the bucket

=back

If the operation failed, C<undef> is returned, else an hash reference
describing the bucket is returned.

    my $bucket = $riak->list_bucket(
        'bucketname',
        parameters => {
            props => 'false',
        },
        callback => sub {
            my $struct = shift;
            if ( scalar @{ $struct->{keys} } ) {
                # do something
            }
        }
    );

=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }])

Sets bucket properties like n_val and allow_mult.
=over 2

=item

n_val - the number of replicas for objects in this bucket

=item

allow_mult - whether to allow sibling objects to be created (concurrent updates)

=back

If successful, B<1> is returned, else B<0>.

    my $result = $riak->set_bucket('bucket')->recv;

=item B<fetch>($bucketname, $object, [parameters => { }, callback => sub { }])

Reads an object from a bucket.

=item B<store>($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]);

=item B<delete>($bucketname, $objectname, [parameters => { }, callback => sub { }]);

=back

=head1 AUTHOR

franck cuny E<lt>franck@lumberjaph.netE<gt>

=head1 SEE ALSO

=head1 LICENSE

Copyright 2009, 2010 by linkfluence.

L<http://linkfluence.net>

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut