summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/AnyEvent/Riak.pm303
-rw-r--r--lib/AnyEvent/Riak/Bucket.pm113
-rw-r--r--lib/AnyEvent/Riak/Object.pm52
-rw-r--r--lib/AnyEvent/Riak/Role/CVCB.pm24
-rw-r--r--lib/AnyEvent/Riak/Role/Client.pm12
-rw-r--r--lib/AnyEvent/Riak/Role/HTTPUtils.pm16
6 files changed, 158 insertions, 362 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm
index ab72249..96b9c47 100644
--- a/lib/AnyEvent/Riak.pm
+++ b/lib/AnyEvent/Riak.pm
@@ -1,46 +1,32 @@
package AnyEvent::Riak;
-use strict;
-use warnings;
+# ABSTRACT: non-blocking Riak client
-use Carp;
use JSON;
use AnyEvent;
use AnyEvent::HTTP;
-use MIME::Base64;
-use YAML::Syck;
-
use Moose;
-with qw/
- AnyEvent::Riak::Role::CVCB
- AnyEvent::Riak::Role::HTTPUtils
- /;
-use AnyEvent::Riak::Bucket;
+with qw/AnyEvent::Riak::Role::HTTPUtils AnyEvent::Riak::Role::CVCB/;
our $VERSION = '0.02';
has host => (is => 'rw', isa => 'Str', default => 'http://127.0.0.1:8098');
-has path => (is => 'rw', isa => 'Str', default => 'riak');
+has path => (is => 'rw', isa => 'Str', default => 'riak');
has mapred_path => (is => 'rw', isa => 'Str', default => 'mapred');
has r => (is => 'rw', isa => 'Int', default => 2);
has w => (is => 'rw', isa => 'Int', default => 2);
has dw => (is => 'rw', isa => 'Int', default => 2);
-has client_id => (
- is => 'rw',
- isa => 'Str',
- default =>
- sub { "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), '') }
-);
sub is_alive {
- my ($self, %options) = @_;
+ my $self = shift;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
http_request(
GET => $self->_build_uri([qw/ping/]),
- headers => $self->_build_headers($options{params}),
+ headers => $self->_build_headers(),
sub {
my ($body, $headers) = @_;
if ($headers->{Status} == 200) {
@@ -49,46 +35,51 @@ sub is_alive {
else {
$cv->send($cb->(0));
}
- },
+ }
);
- return $cv;
+ $cv;
}
sub list_bucket {
- my ($self, $bucket_name, %options) = @_;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my $self = shift;
+ my $bucket_name = shift;
+
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
+
+ my $params = {
+ props => delete $options->{props} || 'true',
+ keys => delete $options->{keys} || 'true',
+ };
http_request(
- GET => $self->_build_uri(
- [$self->{path}, $bucket_name],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ GET => $self->_build_uri([$self->path, $bucket_name], $params),
+ headers => $self->_build_headers(),
sub {
-
my ($body, $headers) = @_;
if ($body && $headers->{Status} == 200) {
my $res = JSON::decode_json($body);
$cv->send($cb->($res));
}
else {
- $cv->send(undef);
+ $cv->send($cb->(undef));
}
}
);
- return $cv;
+ $cv;
}
sub set_bucket {
- my ($self, $bucket, $schema, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $schema = shift;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my ($cv, $cb) = $self->_cvcb(\@_);
http_request(
- PUT =>
- $self->_build_uri([$self->{path}, $bucket], $options{params}),
- headers => $self->_build_headers($options{params}),
- body => JSON::encode_json({props => $schema}),
+ PUT => $self->_build_uri([$self->path, $bucket_name]),
+ headers => $self->_build_headers(),
+ body => JSON::encode_json({props => $schema}),
sub {
my ($body, $headers) = @_;
if ($headers->{Status} == 204) {
@@ -103,18 +94,32 @@ sub set_bucket {
}
sub fetch {
- my ($self, $bucket, $key, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $key = shift;
+
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
+
+ my $params = {r => $options->{params}->{r} || $self->r,};
- my ($cv, $cb) = $self->cvcb(\%options);
+ if ($options->{vtag}) {
+ $params->{vtag} = delete $options->{vtag};
+ }
+
+ my $headers = {};
+ foreach (qw/If-None-Match If-Modified-Since Accept/) {
+ $headers->{$_} = delete $options->{headers}->{$_}
+ if (exists $options->{headers}->{$_});
+ }
http_request(
- GET => $self->_build_uri(
- [$self->{path}, $bucket, $key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ GET =>
+ $self->_build_uri([$self->path, $bucket_name, $key], $params),
+ headers => $self->_build_headers($headers),
sub {
my ($body, $headers) = @_;
+ # XXX 300 && 304
if ($body && $headers->{Status} == 200) {
$cv->send($cb->(JSON::decode_json($body)));
}
@@ -127,67 +132,80 @@ sub fetch {
}
sub store {
- my ($self, $bucket, $key, $object, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $object = shift;
+
+ my ($cv, $cb) = $self->_cvcb(\@_);
+ my $options = shift;
+ my $key = '';
+
+ my $params = {
+ w => $options->{params}->{w} || $self->w,
+ dw => $options->{params}->{dw} || $self->dw,
+ returnbody => $options->{params}->{returnbody} || 'true',
+ };
- my ($cv, $cb) = $self->cvcb(\%options);
+ if ($options->{key}) {
+ $key = delete $options->{key};
+ $params->{r} = $options->{params}->{r} || $self->r;
+ }
+
+ # XXX headers
my $json = JSON::encode_json($object);
http_request(
- POST => $self->_build_uri(
- [$self->{path}, $bucket, $key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ POST => $self->_build_uri([$self->path, $bucket_name, $key,], $params),
+ headers => $self->_build_headers(),
body => $json,
sub {
my ($body, $headers) = @_;
my $result;
- if ($headers->{Status} == 204) {
+ if ($body && ($headers->{Status} == 201 || $headers->{Status} == 200)) {
$result = $body ? JSON::decode_json($body) : 1;
}
+ elsif ($headers->{Status} == 204) {
+ $result = 1;
+ }
else {
$result = 0;
}
- $cv->send($cb->($result));
+ $cv->send($cb->($result, $headers));
}
);
$cv;
}
sub delete {
- my ($self, $bucket, $key, %options) = @_;
+ my $self = shift;
+ my $bucket_name = shift;
+ my $key = shift;
- my ($cv, $cb) = $self->cvcb(\%options);
+ my ($cv, $cb) = $self->_cvcb(@_);
http_request(
- DELETE => $self->_build_uri(
- [$self->{path}, $bucket, $key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
+ DELETE => $self->_build_uri([$self->path, $bucket_name, $key],),
+ headers => $self->_build_headers(),
sub {
- $cv->send($cb->(@_));
+ my ($body, $headers) = @_;
+ if ($headers->{Status} == 204) {
+ $cv->send($cb->(1));
+ }
+ else {
+ $cv->send($cb->(0));
+ }
}
);
$cv;
}
-sub bucket {
- my ($self, $name) = @_;
- return AnyEvent::Riak::Bucket->new(name => $name, _client => $self);
-}
-
no Moose;
1;
__END__
-=head1 NAME
-
-AnyEvent::Riak - Non-blocking Riak client
-
=head1 SYNOPSIS
use AnyEvent::Riak;
@@ -197,132 +215,87 @@ AnyEvent::Riak - Non-blocking Riak client
path => 'riak',
);
- die "Riak is not running" unless $riak->is_alive->recv;
-
- my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv;
-
This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.
-For a complete description of the Riak REST API, please refer to
-L<https://wiki.basho.com/display/RIAK/REST+API>.
+For a complete description of the Riak REST API, please refer to L<https://wiki.basho.com/display/RIAK/REST+API>.
=head1 DESCRIPTION
AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allows you to connect to a Riak instance, create, modify and delete Riak objects.
-There is two interfaces for this module :
-
-=over 4
-
-=item B<raw JSON>
-
-This interface will only serialize and deserialize JSON return from the Riak REST API.
-
-=item B<OO>
-
-This interface will turn Riak buckets into Object, the same for Riak objects.
-
-=back
-
=head2 METHODS
-=head3 RAW
-
=over 4
-=item B<is_alive>([callback => sub { }, params => { }])
-
-Check if the Riak server is alive. If the ping is successful, 1 is returned,
-else 0.
+=item B<is_alive> ([$cv, $cb])
- my $ping = $riak->is_alive->recv;
+Check if the Riak server is alive. If the ping is successful, 1 is returned, else 0.
-=item B<list_bucket>($bucketname, [callback => sub { }, params => { }])
+Options can be:
-Get the schema and key list for 'bucket'. Possible parameters are:
-
-=over 2
+=over 4
-=item
+=item B<headers>
-props=[true|false] - whether to return the bucket properties
+A list of valid HTTP headers that will be send with the query
-=item
+=back
-keys=[true|false|stream] - whether to return the keys stored in the bucket
+=item B<list_bucket> ($bucket_name, [$options, $cv, $cb])
-=back
+Reads the bucket properties and/or keys.
-If the operation failed, C<undef> is returned, else an hash reference
-describing the bucket is returned.
-
- my $bucket = $riak->list_bucket(
- 'bucketname',
- parameters => {
- props => 'false',
- },
- callback => sub {
- my $struct = shift;
- if ( scalar @{ $struct->{keys} } ) {
- # do something
- }
+ $riak->list_bucket(
+ 'mybucket',
+ {props => 'true', keys => 'false'},
+ sub {
+ my $res = shift;
+ ...
}
- );
+ );
-=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }])
+=item B<set_bucket> ($bucket_name, $schema, [%options, $cv, $cb])
Sets bucket properties like n_val and allow_mult.
-=over 2
-
-=item
-
-n_val - the number of replicas for objects in this bucket
-
-=item
-
-allow_mult - whether to allow sibling objects to be created (concurrent updates)
-
-=back
-
-If successful, B<1> is returned, else B<0>.
-
- my $result = $riak->set_bucket('bucket'), {n_val => 5}->recv;
+ $riak->set_bucket(
+ 'mybucket',
+ {n_val => 5},
+ sub {
+ my $res = shift;
+ ...;
+ }
+ );
-=item B<fetch>($bucketname, $object, [parameters => { }, callback => sub { }])
+=item B<fetch> ($bucket_name, $key, [$options, $cv, $cb])
Reads an object from a bucket.
-=item B<store>($bucketname, $objectname, $objectdata, [parameters => { }, callback => sub { }]);
-
-=item B<delete>($bucketname, $objectname, [parameters => { }, callback => sub { }]);
-
-=back
-
-=head3 OO
-
-=item B<bucket>($bucketname);
-
-Return a C<AnyEvent::Riak::Bucket> object.
-
- my $r = AnyEvent::Riak->new(...);
- my $bucket = $r->bucket('foo');
- say $bucket->name;
- say $bucket->properties->{props}->{nval};
-
-=head1 AUTHOR
+ $riak->fetch(
+ 'mybucket', 'mykey',
+ {params => {r = 2}, headers => {'If-Modified-Since' => $value}},
+ sub {
+ my $res = shift;
+ }
+ );
-franck cuny E<lt>franck@lumberjaph.netE<gt>
+=item B<store> ($bucket_name, $key, $object, [$options, $cv, $cb])
-=head1 SEE ALSO
+Stores a new object in a bucket.
-=head1 LICENSE
+ $riak->store(
+ 'mybucket', $object,
+ {key => 'mykey', headers => {''}, params => {w => 2}},
+ sub {
+ my $res = shift;
+ ...
+ }
+ );
-Copyright 2009, 2010 by linkfluence.
+=item B<delete> ($bucket, $key, [$options, $cv, $cb])
-L<http://linkfluence.net>
+Deletes an object from a bucket.
-This library is free software; you can redistribute it and/or modify
-it under the same terms as Perl itself.
+ $riak->delete('mybucket', 'mykey', sub { my $res = shift;... });
-=cut
+=back
diff --git a/lib/AnyEvent/Riak/Bucket.pm b/lib/AnyEvent/Riak/Bucket.pm
deleted file mode 100644
index 0c690dd..0000000
--- a/lib/AnyEvent/Riak/Bucket.pm
+++ /dev/null
@@ -1,113 +0,0 @@
-package AnyEvent::Riak::Bucket;
-
-use Moose;
-use AnyEvent::HTTP;
-
-use AnyEvent::Riak::Object;
-
-with qw/
- AnyEvent::Riak::Role::CVCB
- AnyEvent::Riak::Role::HTTPUtils
- AnyEvent::Riak::Role::Client
- /;
-
-has name => (is => 'rw', isa => 'Str', required => 1);
-has _properties =>
- (is => 'rw', isa => 'HashRef', predicate => '_has_properties');
-has r => (
- is => 'rw',
- isa => 'Int',
- lazy => 1,
- default => sub { my $self = shift; $self->_client->r }
-);
-has w => (
- is => 'rw',
- isa => 'Int',
- lazy => 1,
- default => sub { my $self = shift; $self->_client->w }
-);
-has dw => (
- is => 'rw',
- isa => 'Int',
- lazy => 1,
- default => sub { my $self = shift; $self->_client->dw }
-);
-
-sub get_properties {
- my ($self, %options) = @_;
-
- my ($cv, $cb) = $self->cvcb(\%options);
-
- if ($self->_has_properties) {
- $cv->send($self->_properties);
- }
- else {
- http_request(
- GET => $self->_build_uri(
- [$self->_client->path, $self->name],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
- sub {
- my ($body, $headers) = @_;
- if ($body && $headers->{Status} == 200) {
- my $prop = JSON::decode_json($body);
- $self->_properties($prop);
- $cv->send($cb->($self->_properties));
- }
- else {
- $cv->send(undef);
- }
- }
- );
- }
- return $cv;
-}
-
-sub set_properties {
- my ($self, $schema, %options) = @_;
-
- my ($cv, $cb) = $self->cvcb(\%options);
-
- http_request(
- PUT =>
- $self->_build_uri([$self->{path}, $self->name], $options{params}),
- headers => $self->_build_headers($options{params}),
- body => JSON::encode_json({props => $schema}),
- sub {
- my ($body, $headers) = @_;
- if ($headers->{Status} == 204) {
- $cv->send($cb->(1));
- }
- else {
- $cv->send($cb->(0));
- }
- }
- );
- return $cv;
-}
-
-sub create {
- my ($self, $key, $content) = @_;
- my $object = AnyEvent::Riak::Object->new(
- _client => $self->_client,
- key => $key,
- content => $content,
- bucket => $self,
- );
- return $object;
-}
-
-sub object {
- my ($self, $key, $r) = @_;
- my $obj = AnyEvent::Riak::Object->new(
- _client => $self->_client,
- key => $key,
- r => $r,
- bucket => $self,
- );
-}
-
-no Moose;
-
-1;
diff --git a/lib/AnyEvent/Riak/Object.pm b/lib/AnyEvent/Riak/Object.pm
deleted file mode 100644
index d106254..0000000
--- a/lib/AnyEvent/Riak/Object.pm
+++ /dev/null
@@ -1,52 +0,0 @@
-package AnyEvent::Riak::Object;
-
-use Moose;
-use AnyEvent::HTTP;
-
-with qw/
- AnyEvent::Riak::Role::Client
- AnyEvent::Riak::Role::HTTPUtils
- AnyEvent::Riak::Role::CVCB
- /;
-
-has key => (is => 'rw', isa => 'Str');
-has _content => (is => 'rw', isa => 'HashRef', predicate => '_has_content');
-has content_type => (is => 'rw', isa => 'Str', default => 'application/json');
-has bucket => (is => 'rw', isa => 'AnyEvent::Riak::Bucket', required => 1);
-has status => (is => 'rw', isa => 'Int');
-has r => (is => 'rw', isa => 'Int');
-
-sub get {
- my ($self, %options) = @_;
-
- my ($cv, $cb) = $self->cvcb(\%options);
-
- if ($self->_has_content) {
- $cv->send($self->_content);
- }
- else {
- http_request(
- GET => $self->_build_uri(
- [$self->_client->path, $self->bucket->name, $self->key],
- $options{params}
- ),
- headers => $self->_build_headers($options{params}),
- sub {
- my ($body, $headers) = @_;
- if ($body && $headers->{Status} == 200) {
- my $content = JSON::decode_json($body);
- $self->_content($content);
- $cv->send($cb->($self->_content));
- }
- else {
- $cv->send(undef);
- }
- }
- );
- }
- return $cv;
-}
-
-no Moose;
-
-1;
diff --git a/lib/AnyEvent/Riak/Role/CVCB.pm b/lib/AnyEvent/Riak/Role/CVCB.pm
index 74684c2..73812c2 100644
--- a/lib/AnyEvent/Riak/Role/CVCB.pm
+++ b/lib/AnyEvent/Riak/Role/CVCB.pm
@@ -1,27 +1,19 @@
package AnyEvent::Riak::Role::CVCB;
-use Moose::Role;
+# ABSTRACT: return a default condvar and callback if none defined
-sub default_cb {
- my ($self, $options) = @_;
- return sub {
- my $res = shift;
- return $res;
- };
-}
+use Moose::Role;
-sub cvcb {
+sub _cvcb {
my ($self, $options) = @_;
- my ($cv, $cb);
- $cv = AE::cv;
- if ($options->{callback}) {
- $cb = delete $options->{callback};
- }
- else {
- $cb = $self->default_cb();
+ my ($cv, $cb) = (AnyEvent->condvar, sub { return @_ });
+ if ($options && @$options) {
+ $cv = pop @$options if UNIVERSAL::isa($options->[-1], 'AnyEvent::CondVar');
+ $cb = pop @$options if ref $options->[-1] eq 'CODE';
}
($cv, $cb);
}
1;
+
diff --git a/lib/AnyEvent/Riak/Role/Client.pm b/lib/AnyEvent/Riak/Role/Client.pm
deleted file mode 100644
index 0623e71..0000000
--- a/lib/AnyEvent/Riak/Role/Client.pm
+++ /dev/null
@@ -1,12 +0,0 @@
-package AnyEvent::Riak::Role::Client;
-
-use Moose::Role;
-
-has _client => (
- is => 'rw',
- isa => 'AnyEvent::Riak',
- required => 1,
- handles => {host => 'host', client_id => 'client_id'}
-);
-
-1;
diff --git a/lib/AnyEvent/Riak/Role/HTTPUtils.pm b/lib/AnyEvent/Riak/Role/HTTPUtils.pm
index 399f369..701af5d 100644
--- a/lib/AnyEvent/Riak/Role/HTTPUtils.pm
+++ b/lib/AnyEvent/Riak/Role/HTTPUtils.pm
@@ -1,15 +1,23 @@
package AnyEvent::Riak::Role::HTTPUtils;
+# ABSTRACT: HTTP methods
+
use Moose::Role;
use AnyEvent;
use AnyEvent::HTTP;
use URI;
-
use MIME::Base64;
+has client_id => (is => 'rw', isa => 'Str', lazy_build => 1,);
+
+sub _build_client_id {
+ "perl_anyevent_riak" . encode_base64(int(rand(10737411824)), '');
+}
+
sub _build_uri {
my ($self, $path, $options) = @_;
+
my $uri = URI->new($self->host);
$uri->path(join("/", @$path));
$uri->query_form($self->_build_query($options));
@@ -17,8 +25,8 @@ sub _build_uri {
}
sub _build_headers {
- my ($self, $options) = @_;
- my $headers = delete $options->{headers} || {};
+ my $self = shift;
+ my $headers = shift || {};
$headers->{'X-Riak-ClientId'} = $self->client_id;
$headers->{'Content-Type'} = 'application/json'
@@ -28,7 +36,7 @@ sub _build_headers {
sub _build_query {
my ($self, $options) = @_;
- my $valid_options = [qw/props keys returnbody/];
+ my $valid_options = [qw/props keys returnbody w r dw/];
my $query;
foreach (@$valid_options) {
$query->{$_} = $options->{$_} if exists $options->{$_};