summaryrefslogtreecommitdiff
path: root/lib/AnyEvent/Riak.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/AnyEvent/Riak.pm')
-rw-r--r--lib/AnyEvent/Riak.pm303
1 files changed, 138 insertions, 165 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm
index ab72249..96b9c47 100644
--- a/lib/AnyEvent/Riak.pm
+++ b/lib/AnyEvent/Riak.pm
@@ -1,46 +1,32 @@
package AnyEvent::Riak;
-use strict;
-use warnings;
+# ABSTRACT: non-blocking Riak client
-use Carp;
use JSON;
use AnyEvent;
use AnyEvent::HTTP;
-use MIME::Base64;
-use YAML::Syck;
-
use Moose;
-with qw/
- AnyEvent::Riak::Role::CVCB
- AnyEvent::Riak::Role::HTTPUtils
- /;
-use AnyEvent::Riak::Bucket;
+with qw/AnyEvent::Riak::Role::HTTPUtils AnyEvent::Riak::Role::CVCB/;
our $VERSION = '0.02';
has host => (is => 'rw', isa => 'Str', default => 'http://127.0.0.1:8098');
-has path => (is => 'rw', isa => 'Str', default => 'riak');
+has path => (is => 'rw', isa => 'Str', default => 'riak');
has mapred_path => (is => 'rw', isa => 'Str', default => 'mapred');
has r => (is => 'rw', isa => 'Int', default => 2);
has w => (is => 'rw', isa => 'Int', default => 2);
has dw => (is => 'rw', isa => 'Int', default => 2);
-has client_id => (
- is => 'rw',
- isa => 'Str',
- default =>
- sub { "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), '') }
-);
sub is_alive {
- my ($self, %options) = @_;
+ my $self = shift;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
http_request(
GET => $self->_build_uri([qw/ping/]),
- headers => $self->_build_headers($options{params}),
+ headers => $self->_build_headers(),
sub {
my ($body, $headers) = @_;
if ($headers->{Status} == 200) {
@@ -49,46 +35,51 @@ sub is_alive {
else {
$cv->send($cb->(0));
}
- },
+ }
);
- return $cv;
+ $cv;
}
sub list_bucket {
- my ($self, $bucket_name, %options) = @_;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my $self = shift;
+ my $bucket_name = shift;
+
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
+
+ my $params = {
+ props => delete $options->{props} || 'true',
+ keys => delete $options->{keys} || 'true',
+ };
http_request(
- GET => $self->_build_uri(
- [$self->{path}, $bucket_name],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ GET => $self->_build_uri([$self->path, $bucket_name], $params),
+ headers => $self->_build_headers(),
sub {
-
my ($body, $headers) = @_;
if ($body && $headers->{Status} == 200) {
my $res = JSON::decode_json($body);
$cv->send($cb->($res));
}
else {
- $cv->send(undef);
+ $cv->send($cb->(undef));
}
}
);
- return $cv;
+ $cv;
}
sub set_bucket {
- my ($self, $bucket, $schema, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $schema = shift;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my ($cv, $cb) = $self->_cvcb(\@_);
http_request(
- PUT =>
- $self->_build_uri([$self->{path}, $bucket], $options{params}),
- headers => $self->_build_headers($options{params}),
- body => JSON::encode_json({props => $schema}),
+ PUT => $self->_build_uri([$self->path, $bucket_name]),
+ headers => $self->_build_headers(),
+ body => JSON::encode_json({props => $schema}),
sub {
my ($body, $headers) = @_;
if ($headers->{Status} == 204) {
@@ -103,18 +94,32 @@ sub set_bucket {
}
sub fetch {
- my ($self, $bucket, $key, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $key = shift;
+
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
+
+ my $params = {r => $options->{params}->{r} || $self->r,};
- my ($cv, $cb) = $self->cvcb(\%options);
+ if ($options->{vtag}) {
+ $params->{vtag} = delete $options->{vtag};
+ }
+
+ my $headers = {};
+ foreach (qw/If-None-Match If-Modified-Since Accept/) {
+ $headers->{$_} = delete $options->{headers}->{$_}
+ if (exists $options->{headers}->{$_});
+ }
http_request(
- GET => $self->_build_uri(
- [$self->{path}, $bucket, $key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ GET =>
+ $self->_build_uri([$self->path, $bucket_name, $key], $params),
+ headers => $self->_build_headers($headers),
sub {
my ($body, $headers) = @_;
+ # XXX 300 && 304
if ($body && $headers->{Status} == 200) {
$cv->send($cb->(JSON::decode_json($body)));
}
@@ -127,67 +132,80 @@ sub fetch {
}
sub store {
- my ($self, $bucket, $key, $object, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $object = shift;
+
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
+ my $key = '';
+
+ my $params = {
+ w => $options->{params}->{w} || $self->w,
+ dw => $options->{params}->{dw} || $self->dw,
+ returnbody => $options->{params}->{returnbody} || 'true',
+ };
- my ($cv, $cb) = $self->cvcb(\%options);
+ if ($options->{key}) {
+ $key = delete $options->{key};
+ $params->{r} = $options->{params}->{r} || $self->r;
+ }
+
+ # XXX headers
my $json = JSON::encode_json($object);
http_request(
- POST => $self->_build_uri(
- [$self->{path}, $bucket, $key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ POST => $self->_build_uri([$self->path, $bucket_name, $key,], $params),
+ headers => $self->_build_headers(),
body => $json,
sub {
my ($body, $headers) = @_;
my $result;
- if ($headers->{Status} == 204) {
+ if ($body && ($headers->{Status} == 201 || $headers->{Status} == 200)) {
$result = $body ? JSON::decode_json($body) : 1;
}
+ elsif ($headers->{Status} == 204) {
+ $result = 1;
+ }
else {
$result = 0;
}
- $cv->send($cb->($result));
+ $cv->send($cb->($result, $headers));
}
);
$cv;
}
sub delete {
- my ($self, $bucket, $key, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $key = shift;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my ($cv, $cb) = $self->_cvcb(@_);
http_request(
- DELETE => $self->_build_uri(
- [$self->{path}, $bucket, $key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ DELETE => $self->_build_uri([$self->path, $bucket_name, $key],),
+ headers => $self->_build_headers(),
sub {
- $cv->send($cb->(@_));
+ my ($body, $headers) = @_;
+ if ($headers->{Status} == 204) {
+ $cv->send($cb->(1));
+ }
+ else {
+ $cv->send($cb->(0));
+ }
}
);
$cv;
}
-sub bucket {
- my ($self, $name) = @_;
- return AnyEvent::Riak::Bucket->new(name => $name, _client => $self);
-}
-
no Moose;
1;
__END__
-=head1 NAME
-
-AnyEvent::Riak - Non-blocking Riak client
-
=head1 SYNOPSIS
use AnyEvent::Riak;
@@ -197,132 +215,87 @@ AnyEvent::Riak - Non-blocking Riak client
path => 'riak',
);
- die "Riak is not running" unless $riak->is_alive->recv;
-
- my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv;
-
This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.
-For a complete description of the Riak REST API, please refer to
-L<https://wiki.basho.com/display/RIAK/REST+API>.
+For a complete description of the Riak REST API, please refer to L<https://wiki.basho.com/display/RIAK/REST+API>.
=head1 DESCRIPTION
AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.
-There is two interfaces for this module :
-
-=over 4
-
-=item B<raw JSON>
-
-This interface will only serialize and deserialize JSON return from the Riak REST API.
-
-=item B<OO>
-
-This interface will turn Riak buckets into Object, the same for Riak objects.
-
-=back
-
=head2 METHODS
-=head3 RAW
-
=over 4
-=item B<is_alive>([callback => sub { }, params => { }])
-
-Check if the Riak server is alive. If the ping is successful, 1 is returned,
-else 0.
+=item B<is_alive> ([$cv, $cb])
- my $ping = $riak->is_alive->recv;
+Check if the Riak server is alive. If the ping is successful, 1 is returned, else 0.
-=item B<list_bucket>($bucketname, [callback => sub { }, params => { }])
+Options can be:
-Get the schema and key list for 'bucket'. Possible parameters are:
-
-=over 2
+=over 4
-=item
+=item B<headers>
-props=[true|false] - whether to return the bucket properties
+A list of valid HTTP headers that will be send with the query
-=item
+=back
-keys=[true|false|stream] - whether to return the keys stored in the bucket
+=item B<list_bucket> ($bucket_name, [$options, $cv, $cb])
-=back
+Reads the bucket properties and/or keys.
-If the operation failed, C<undef> is returned, else an hash reference
-describing the bucket is returned.
-
- my $bucket = $riak->list_bucket(
- 'bucketname',
- parameters => {
- props => 'false',
- },
- callback => sub {
- my $struct = shift;
- if ( scalar @{ $struct->{keys} } ) {
- # do something
- }
+ $riak->list_bucket(
+ 'mybucket',
+ {props => 'true', keys => 'false'},
+ sub {
+ my $res = shift;
+ ...
}
- );
+ );
-=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }])
+=item B<set_bucket> ($bucket_name, $schema, [%options, $cv, $cb])
Sets bucket properties like n_val and allow_mult.
-=over 2
-
-=item
-
-n_val - the number of replicas for objects in this bucket
-
-=item
-
-allow_mult - whether to allow sibling objects to be created (concurrent updates)
-
-=back
-
-If successful, B<1> is returned, else B<0>.
-
- my $result = $riak->set_bucket('bucket'), {n_val => 5}->recv;
+ $riak->set_bucket(
+ 'mybucket',
+ {n_val => 5},
+ sub {
+ my $res = shift;
+ ...;
+ }
+ );
-=item B<fetch>($bucketname, $object, [parameters => { }, callback => sub { }])
+=item B<fetch> ($bucket_name, $key, [$options, $cv, $cb])
Reads an object from a bucket.
-=item B<store>($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]);
-
-=item B<delete>($bucketname, $objectname, [parameters => { }, callback => sub { }]);
-
-=back
-
-=head3 OO
-
-=item B<bucket>($bucketname);
-
-Return a C<AnyEvent::Riak::Bucket> object.
-
- my $r = AnyEvent::Riak->new(...);
- my $bucket = $r->bucket('foo');
- say $bucket->name;
- say $bucket->properties->{props}->{nval};
-
-=head1 AUTHOR
+ $riak->fetch(
+ 'mybucket', 'mykey',
+ {params => {r = 2}, headers => {'If-Modified-Since' => $value}},
+ sub {
+ my $res = shift;
+ }
+ );
-franck cuny E<lt>franck@lumberjaph.netE<gt>
+=item B<store> ($bucket_name, $key, $object, [$options, $cv, $cb])
-=head1 SEE ALSO
+Stores a new object in a bucket.
-=head1 LICENSE
+ $riak->store(
+ 'mybucket', $object,
+ {key => 'mykey', headers => {''}, params => {w => 2}},
+ sub {
+ my $res = shift;
+ ...
+ }
+ );
-Copyright 2009, 2010 by linkfluence.
+=item B<delete> ($bucket, $key, [$options, $cv, $cb])
-L<http://linkfluence.net>
+Deletes an object from a bucket.
-This library is free software; you can redistribute it and/or modify
-it under the same terms as Perl itself.
+ $riak->delete('mybucket', 'mykey', sub { my $res = shift;... });
-=cut
+=back