diff options
Diffstat (limited to '')
| -rw-r--r-- | lib/AnyEvent/Riak.pm | 303 |
1 files changed, 138 insertions, 165 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm index ab72249..96b9c47 100644 --- a/lib/AnyEvent/Riak.pm +++ b/lib/AnyEvent/Riak.pm @@ -1,46 +1,32 @@ package AnyEvent::Riak; -use strict; -use warnings; +# ABSTRACT: non-blocking Riak client -use Carp; use JSON; use AnyEvent; use AnyEvent::HTTP; -use MIME::Base64; -use YAML::Syck; - use Moose; -with qw/ - AnyEvent::Riak::Role::CVCB - AnyEvent::Riak::Role::HTTPUtils - /; -use AnyEvent::Riak::Bucket; +with qw/AnyEvent::Riak::Role::HTTPUtils AnyEvent::Riak::Role::CVCB/; our $VERSION = '0.02'; has host => (is => 'rw', isa => 'Str', default => 'http://127.0.0.1:8098'); -has path => (is => 'rw', isa => 'Str', default => 'riak'); +has path => (is => 'rw', isa => 'Str', default => 'riak'); has mapred_path => (is => 'rw', isa => 'Str', default => 'mapred'); has r => (is => 'rw', isa => 'Int', default => 2); has w => (is => 'rw', isa => 'Int', default => 2); has dw => (is => 'rw', isa => 'Int', default => 2); -has client_id => ( - is => 'rw', - isa => 'Str', - default => - sub { "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), '') } -); sub is_alive { - my ($self, %options) = @_; + my $self = shift; - my ($cv, $cb) = $self->cvcb(\%options); + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; http_request( GET => $self->_build_uri([qw/ping/]), - headers => $self->_build_headers($options{params}), + headers => $self->_build_headers(), sub { my ($body, $headers) = @_; if ($headers->{Status} == 200) { @@ -49,46 +35,51 @@ sub is_alive { else { $cv->send($cb->(0)); } - }, + } ); - return $cv; + $cv; } sub list_bucket { - my ($self, $bucket_name, %options) = @_; - my ($cv, $cb) = $self->cvcb(\%options); + my $self = shift; + my $bucket_name = shift; + + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; + + my $params = { + props => delete $options->{props} || 'true', + keys => delete $options->{keys} || 'true', + }; http_request( - GET => $self->_build_uri( - [$self->{path}, $bucket_name], - $options{params} - ), - headers => $self->_build_headers($options{params}), + GET => $self->_build_uri([$self->path, $bucket_name], $params), + headers => $self->_build_headers(), sub { - my ($body, $headers) = @_; if ($body && $headers->{Status} == 200) { my $res = JSON::decode_json($body); $cv->send($cb->($res)); } else { - $cv->send(undef); + $cv->send($cb->(undef)); } } ); - return $cv; + $cv; } sub set_bucket { - my ($self, $bucket, $schema, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $schema = shift; - my ($cv, $cb) = $self->cvcb(\%options); + my ($cv, $cb) = $self->_cvcb(\@_); http_request( - PUT => - $self->_build_uri([$self->{path}, $bucket], $options{params}), - headers => $self->_build_headers($options{params}), - body => JSON::encode_json({props => $schema}), + PUT => $self->_build_uri([$self->path, $bucket_name]), + headers => $self->_build_headers(), + body => JSON::encode_json({props => $schema}), sub { my ($body, $headers) = @_; if ($headers->{Status} == 204) { @@ -103,18 +94,32 @@ sub set_bucket { } sub fetch { - my ($self, $bucket, $key, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $key = shift; + + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; + + my $params = {r => $options->{params}->{r} || $self->r,}; - my ($cv, $cb) = $self->cvcb(\%options); + if ($options->{vtag}) { + $params->{vtag} = delete $options->{vtag}; + } + + my $headers = {}; + foreach (qw/If-None-Match If-Modified-Since Accept/) { + $headers->{$_} = delete $options->{headers}->{$_} + if (exists $options->{headers}->{$_}); + } http_request( - GET => $self->_build_uri( - [$self->{path}, $bucket, $key], - $options{params} - ), - headers => $self->_build_headers($options{params}), + GET => + $self->_build_uri([$self->path, $bucket_name, $key], $params), + headers => $self->_build_headers($headers), sub { my ($body, $headers) = @_; + # XXX 300 && 304 if ($body && $headers->{Status} == 200) { $cv->send($cb->(JSON::decode_json($body))); } @@ -127,67 +132,80 @@ sub fetch { } sub store { - my ($self, $bucket, $key, $object, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $object = shift; + + my ($cv, $cb) = $self->_cvcb(\@_); + my $options = shift; + my $key = ''; + + my $params = { + w => $options->{params}->{w} || $self->w, + dw => $options->{params}->{dw} || $self->dw, + returnbody => $options->{params}->{returnbody} || 'true', + }; - my ($cv, $cb) = $self->cvcb(\%options); + if ($options->{key}) { + $key = delete $options->{key}; + $params->{r} = $options->{params}->{r} || $self->r; + } + + # XXX headers my $json = JSON::encode_json($object); http_request( - POST => $self->_build_uri( - [$self->{path}, $bucket, $key], - $options{params} - ), - headers => $self->_build_headers($options{params}), + POST => $self->_build_uri([$self->path, $bucket_name, $key,], $params), + headers => $self->_build_headers(), body => $json, sub { my ($body, $headers) = @_; my $result; - if ($headers->{Status} == 204) { + if ($body && ($headers->{Status} == 201 || $headers->{Status} == 200)) { $result = $body ? JSON::decode_json($body) : 1; } + elsif ($headers->{Status} == 204) { + $result = 1; + } else { $result = 0; } - $cv->send($cb->($result)); + $cv->send($cb->($result, $headers)); } ); $cv; } sub delete { - my ($self, $bucket, $key, %options) = @_; + my $self = shift; + my $bucket_name = shift; + my $key = shift; - my ($cv, $cb) = $self->cvcb(\%options); + my ($cv, $cb) = $self->_cvcb(@_); http_request( - DELETE => $self->_build_uri( - [$self->{path}, $bucket, $key], - $options{params} - ), - headers => $self->_build_headers($options{params}), + DELETE => $self->_build_uri([$self->path, $bucket_name, $key],), + headers => $self->_build_headers(), sub { - $cv->send($cb->(@_)); + my ($body, $headers) = @_; + if ($headers->{Status} == 204) { + $cv->send($cb->(1)); + } + else { + $cv->send($cb->(0)); + } } ); $cv; } -sub bucket { - my ($self, $name) = @_; - return AnyEvent::Riak::Bucket->new(name => $name, _client => $self); -} - no Moose; 1; __END__ -=head1 NAME - -AnyEvent::Riak - Non-blocking Riak client - =head1 SYNOPSIS use AnyEvent::Riak; @@ -197,132 +215,87 @@ AnyEvent::Riak - Non-blocking Riak client path => 'riak', ); - die "Riak is not running" unless $riak->is_alive->recv; - - my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv; - This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91. -For a complete description of the Riak REST API, please refer to -L<https://wiki.basho.com/display/RIAK/REST+API>. +For a complete description of the Riak REST API, please refer to L<https://wiki.basho.com/display/RIAK/REST+API>. =head1 DESCRIPTION AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects. -There is two interfaces for this module : - -=over 4 - -=item B<raw JSON> - -This interface will only serialize and deserialize JSON return from the Riak REST API. - -=item B<OO> - -This interface will turn Riak buckets into Object, the same for Riak objects. - -=back - =head2 METHODS -=head3 RAW - =over 4 -=item B<is_alive>([callback => sub { }, params => { }]) - -Check if the Riak server is alive. If the ping is successful, 1 is returned, -else 0. +=item B<is_alive> ([$cv, $cb]) - my $ping = $riak->is_alive->recv; +Check if the Riak server is alive. If the ping is successful, 1 is returned, else 0. -=item B<list_bucket>($bucketname, [callback => sub { }, params => { }]) +Options can be: -Get the schema and key list for 'bucket'. Possible parameters are: - -=over 2 +=over 4 -=item +=item B<headers> -props=[true|false] - whether to return the bucket properties +A list of valid HTTP headers that will be send with the query -=item +=back -keys=[true|false|stream] - whether to return the keys stored in the bucket +=item B<list_bucket> ($bucket_name, [$options, $cv, $cb]) -=back +Reads the bucket properties and/or keys. -If the operation failed, C<undef> is returned, else an hash reference -describing the bucket is returned. - - my $bucket = $riak->list_bucket( - 'bucketname', - parameters => { - props => 'false', - }, - callback => sub { - my $struct = shift; - if ( scalar @{ $struct->{keys} } ) { - # do something - } + $riak->list_bucket( + 'mybucket', + {props => 'true', keys => 'false'}, + sub { + my $res = shift; + ... } - ); + ); -=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }]) +=item B<set_bucket> ($bucket_name, $schema, [%options, $cv, $cb]) Sets bucket properties like n_val and allow_mult. -=over 2 - -=item - -n_val - the number of replicas for objects in this bucket - -=item - -allow_mult - whether to allow sibling objects to be created (concurrent updates) - -=back - -If successful, B<1> is returned, else B<0>. - - my $result = $riak->set_bucket('bucket'), {n_val => 5}->recv; + $riak->set_bucket( + 'mybucket', + {n_val => 5}, + sub { + my $res = shift; + ...; + } + ); -=item B<fetch>($bucketname, $object, [parameters => { }, callback => sub { }]) +=item B<fetch> ($bucket_name, $key, [$options, $cv, $cb]) Reads an object from a bucket. -=item B<store>($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]); - -=item B<delete>($bucketname, $objectname, [parameters => { }, callback => sub { }]); - -=back - -=head3 OO - -=item B<bucket>($bucketname); - -Return a C<AnyEvent::Riak::Bucket> object. - - my $r = AnyEvent::Riak->new(...); - my $bucket = $r->bucket('foo'); - say $bucket->name; - say $bucket->properties->{props}->{nval}; - -=head1 AUTHOR + $riak->fetch( + 'mybucket', 'mykey', + {params => {r = 2}, headers => {'If-Modified-Since' => $value}}, + sub { + my $res = shift; + } + ); -franck cuny E<lt>franck@lumberjaph.netE<gt> +=item B<store> ($bucket_name, $key, $object, [$options, $cv, $cb]) -=head1 SEE ALSO +Stores a new object in a bucket. -=head1 LICENSE + $riak->store( + 'mybucket', $object, + {key => 'mykey', headers => {''}, params => {w => 2}}, + sub { + my $res = shift; + ... + } + ); -Copyright 2009, 2010 by linkfluence. +=item B<delete> ($bucket, $key, [$options, $cv, $cb]) -L<http://linkfluence.net> +Deletes an object from a bucket. -This library is free software; you can redistribute it and/or modify -it under the same terms as Perl itself. + $riak->delete('mybucket', 'mykey', sub { my $res = shift;... }); -=cut +=back |
