package AnyEvent::Riak; use strict; use warnings; use Carp; use URI; use JSON; use AnyEvent; use AnyEvent::HTTP; use MIME::Base64; use YAML::Syck; our $VERSION = '0.02'; sub new { my ($class, %args) = @_; my $host = delete $args{host} || 'http://127.0.0.1:8098'; my $path = delete $args{path} || 'riak'; my $mapred_path = delete $args{mapred_path} || 'mapred'; my $r = delete $args{r} || 2; my $d = delete $args{w} || 2; my $dw = delete $args{dw} || 2; my $client_id = "perl_anyevent_riak_" . encode_base64(int(rand(10737411824)), ''); bless { host => $host, path => $path, mapred_path => $mapred_path, client_id => $client_id, r => $r, d => $d, dw => $dw, %args, }, $class; } sub _build_uri { my ($self, $path, $options) = @_; my $uri = URI->new($self->{host}); $uri->path(join("/", @$path)); $uri->query_form($self->_build_query($options)); return $uri->as_string; } sub _build_headers { my ($self, $options) = @_; my $headers = delete $options->{headers} || {}; $headers->{'X-Riak-ClientId'} = $self->{client_id}; $headers->{'Content-Type'} = 'application/json' unless exists $headers->{'Content-Type'}; return $headers; } sub _build_query { my ($self, $options) = @_; my $valid_options = [qw/props keys returnbody/]; my $query; foreach (@$valid_options) { $query->{$_} = $options->{$_} if exists $options->{$_}; } $query; } sub default_cb { my ($self, $options) = @_; return sub { my $res = shift; return $res; }; } sub cvcb { my ($self, $options) = @_; my ($cv, $cb); $cv = AE::cv; if ($options->{callback}) { $cb = delete $options->{callback}; } else { $cb = $self->default_cb(); } ($cv, $cb); } sub is_alive { my ($self, %options) = @_; my ($cv, $cb) = $self->cvcb(\%options); http_request( GET => $self->_build_uri([qw/ping/]), headers => $self->_build_headers($options{params}), sub { my ($body, $headers) = @_; if ($headers->{Status} == 200) { $cv->send($cb->(1)); } else { $cv->send($cb->(0)); } }, ); return $cv; } sub list_bucket { my ($self, $bucket_name, %options) = @_; my ($cv, $cb) = $self->cvcb(\%options); http_request( GET => $self->_build_uri( [$self->{path}, $bucket_name], $options{params} ), headers => $self->_build_headers($options{params}), sub { my ($body, $headers) = @_; if ($body && $headers->{Status} == 200) { my $res = JSON::decode_json($body); $cv->send($cb->($res)); } else { $cv->send(undef); } } ); return $cv; } sub set_bucket { my ($self, $bucket, $schema, %options) = @_; my ($cv, $cb) = $self->cvcb(\%options); http_request( PUT => $self->_build_uri([$self->{path}, $bucket], $options{params}), headers => $self->_build_headers($options{params}), body => JSON::encode_json($schema), sub { my ($body, $headers) = @_; if ($headers->{Status} == 204) { $cv->send($cb->(1)); } else { $cv->send($cb->(0)); } } ); $cv; } sub fetch { my ($self, $bucket, $key, %options) = @_; my ($cv, $cb) = $self->cvcb(\%options); http_request( GET => $self->_build_uri( [$self->{path}, $bucket, $key], $options{params} ), headers => $self->_build_headers($options{params}), sub { my ($body, $headers) = @_; if ($body && $headers->{Status} == 200) { $cv->send($cb->(JSON::decode_json($body))); } else { $cv->send($cb->(0)); } } ); $cv; } sub store { my ($self, $bucket, $key, $object, %options) = @_; my ($cv, $cb) = $self->cvcb(\%options); my $json = JSON::encode_json($object); http_request( POST => $self->_build_uri( [$self->{path}, $bucket, $key], $options{params} ), headers => $self->_build_headers($options{params}), body => $json, sub { my ($body, $headers) = @_; my $result; if ($headers->{Status} == 204) { $result = $body ? JSON::decode_json($body) : 1; } else { $result = 0; } $cv->send($cb->($result)); } ); $cv; } sub delete { my ($self, $bucket, $key, %options) = @_; my ($cv, $cb) = $self->cvcb(\%options); http_request( DELETE => $self->_build_uri( [$self->{path}, $bucket, $key], $options{params} ), headers => $self->_build_headers($options{params}), sub { $cv->send($cb->(@_)); } ); $cv; } 1; __END__ =head1 NAME AnyEvent::Riak - Non-blocking Riak client =head1 SYNOPSIS use AnyEvent::Riak; my $riak = AnyEvent::Riak->new( host => 'http://127.0.0.1:8098', path => 'riak', ); die "Riak is not running" unless $riak->is_alive->recv; my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv; This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91. For a complete description of the Riak REST API, please refer to L. =head1 DESCRIPTION AnyEvent::Riak is a non-blocking riak client using C. This client allows you to connect to a Riak instance, create, modify and delete Riak objects. =head2 METHODS =over 4 =item B([callback => sub { }, params => { }]) Check if the Riak server is alive. If the ping is successful, 1 is returned, else 0. my $ping = $riak->is_alive->recv; =item B($bucketname, [callback => sub { }, params => { }]) Get the schema and key list for 'bucket'. Possible parameters are: =over 2 =item props=[true|false] - whether to return the bucket properties =item keys=[true|false|stream] - whether to return the keys stored in the bucket =back If the operation failed, C is returned, else an hash reference describing the bucket is returned. my $bucket = $riak->list_bucket( 'bucketname', parameters => { props => 'false', }, callback => sub { my $struct = shift; if ( scalar @{ $struct->{keys} } ) { # do something } } ); =item B($bucketname, $bucketschema, [parameters => { }, callback => sub { }]) Sets bucket properties like n_val and allow_mult. =over 2 =item n_val - the number of replicas for objects in this bucket =item allow_mult - whether to allow sibling objects to be created (concurrent updates) =back If successful, B<1> is returned, else B<0>. my $result = $riak->set_bucket('bucket')->recv; =item B($bucketname, $object, [parameters => { }, callback => sub { }]) Reads an object from a bucket. =item B($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]); =item B($bucketname, $objectname, [parameters => { }, callback => sub { }]); =back =head1 AUTHOR franck cuny Efranck@lumberjaph.netE =head1 SEE ALSO =head1 LICENSE Copyright 2009, 2010 by linkfluence. L This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut