summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/AnyEvent/Riak.pm186
1 files changed, 79 insertions, 107 deletions
diff --git a/lib/AnyEvent/Riak.pm b/lib/AnyEvent/Riak.pm
index d86a254..cd3ca87 100644
--- a/lib/AnyEvent/Riak.pm
+++ b/lib/AnyEvent/Riak.pm
@@ -2,6 +2,7 @@ package AnyEvent::Riak;
use strict;
use warnings;
+
use Carp;
use URI;
use JSON;
@@ -13,7 +14,7 @@ use YAML::Syck;
our $VERSION = '0.02';
sub new {
- my ( $class, %args ) = @_;
+ my ($class, %args) = @_;
my $host = delete $args{host} || 'http://127.0.0.1:8098';
my $path = delete $args{path} || 'riak';
@@ -22,8 +23,8 @@ sub new {
my $d = delete $args{w} || 2;
my $dw = delete $args{dw} || 2;
- my $client_id
- = "perl_anyevent_riak_" . encode_base64( int( rand(10737411824) ), '' );
+ my $client_id =
+ "perl_anyevent_riak_" . encode_base64(int(rand(10737411824)), '');
bless {
host => $host,
@@ -38,20 +39,20 @@ sub new {
}
sub _build_uri {
- my ( $self, $path, $options ) = @_;
- my $uri = URI->new( $self->{host} );
- $uri->path( join( "/", @$path ) );
- $uri->query_form( $self->_build_query($options) );
+ my ($self, $path, $options) = @_;
+ my $uri = URI->new($self->{host});
+ $uri->path(join("/", @$path));
+ $uri->query_form($self->_build_query($options));
return $uri->as_string;
}
sub _build_headers {
- my ( $self, $options) = @_;
- my $headers = {
- 'X-Riak-ClientId' => $self->{client_id},
- 'Content-Type' => 'application/json',
- };
- # TODO add headers
+ my ($self, $options) = @_;
+ my $headers = delete $options->{headers} || {};
+
+ $headers->{'X-Riak-ClientId'} = $self->{client_id};
+ $headers->{'Content-Type'} = 'application/json'
+ unless exists $headers->{'Content-Type'};
return $headers;
}
@@ -60,41 +61,48 @@ sub _build_query {
my $valid_options = [qw/props keys returnbody/];
my $query;
foreach (@$valid_options) {
- $query->{$_} = $options->{$_} if exists $options->{$_}
+ $query->{$_} = $options->{$_} if exists $options->{$_};
}
$query;
}
sub default_cb {
- my ( $self, $options ) = @_;
+ my ($self, $options) = @_;
return sub {
my $res = shift;
return $res;
};
}
-sub is_alive {
- my ( $self, %options ) = @_;
- my ( $cv, $cb );
+sub cvcb {
+ my ($self, $options) = @_;
+ my ($cv, $cb);
$cv = AE::cv;
- if ( $options{callback} ) {
- $cb = delete $options{callback};
+ if ($options->{callback}) {
+ $cb = delete $options->{callback};
}
else {
$cb = $self->default_cb();
}
+ ($cv, $cb);
+}
+
+sub is_alive {
+ my ($self, %options) = @_;
+
+ my ($cv, $cb) = $self->cvcb(\%options);
http_request(
- GET => $self->_build_uri( [qw/ping/] ),
- headers => $self->_build_headers(%options),
+ GET => $self->_build_uri([qw/ping/]),
+ headers => $self->_build_headers($options{params}),
sub {
- my ( $body, $headers ) = @_;
- if ( $headers->{Status} == 200 ) {
- $cv->send( $cb->(1) );
+ my ($body, $headers) = @_;
+ if ($headers->{Status} == 200) {
+ $cv->send($cb->(1));
}
else {
- $cv->send( $cb->(0) );
+ $cv->send($cb->(0));
}
},
);
@@ -102,28 +110,21 @@ sub is_alive {
}
sub list_bucket {
- my ( $self, $bucket_name, %options ) = @_;
- my ( $cv, $cb );
+ my ($self, $bucket_name, %options) = @_;
- $cv = AE::cv;
- if ( $options{callback} ) {
- $cb = delete $options{callback};
- }
- else {
- $cb = $self->default_cb();
- }
+ my ($cv, $cb) = $self->cvcb(\%options);
http_request(
GET => $self->_build_uri(
- [ $self->{path}, $bucket_name ],
- $options{parameters}
+ [$self->{path}, $bucket_name],
+ $options{params}
),
- headers => $self->_build_headers( $options{parameters} ),
+ headers => $self->_build_headers($options{params}),
sub {
- my ( $body, $headers ) = @_;
- if ( $body && $headers->{Status} == 200 ) {
+ my ($body, $headers) = @_;
+ if ($body && $headers->{Status} == 200) {
my $res = JSON::decode_json($body);
- $cv->send( $cb->($res) );
+ $cv->send($cb->($res));
}
else {
$cv->send(undef);
@@ -134,61 +135,47 @@ sub list_bucket {
}
sub set_bucket {
- my ( $self, $bucket, $schema, %options ) = @_;
- my ( $cv, $cb );
+ my ($self, $bucket, $schema, %options) = @_;
- $cv = AE::cv;
- if ( $options{callback} ) {
- $cb = delete $options{callback};
- }
- else {
- $cb = $self->default_cb();
- }
+ my ($cv, $cb) = $self->cvcb(\%options);
http_request(
- PUT => $self->_build_uri(
- [ $self->{path}, 'bucket' ],
- $options{parameters}
- ),
- headers => $self->_build_headers( $options{parameters} ),
+ PUT =>
+ $self->_build_uri([$self->{path}, $bucket], $options{params}),
+ headers => $self->_build_headers($options{params}),
body => JSON::encode_json($schema),
sub {
- my ( $body, $headers ) = @_;
- if ( $headers->{Status} == 204 ) {
- $cv->send( $cb->(1) );
+ my ($body, $headers) = @_;
+ if ($headers->{Status} == 204) {
+ $cv->send($cb->(1));
}
else {
- $cv->send( $cb->(0) );
+ $cv->send($cb->(0));
}
}
);
$cv;
}
+
sub fetch {
- my ( $self, $bucket, $key, %options ) = @_;
- my ( $cv, $cb );
+ my ($self, $bucket, $key, %options) = @_;
- $cv = AE::cv;
- if ( $options{callback} ) {
- $cb = delete $options{callback};
- }
- else {
- $cb = $self->default_cb();
- }
+ my ($cv, $cb) = $self->cvcb(\%options);
http_request(
GET => $self->_build_uri(
- [ $self->{path}, $bucket, $key ],
- $options{parameters}
+ [$self->{path}, $bucket, $key],
+ $options{params}
),
- headers => $self->_build_headers( $options{parameters} ),
+ headers => $self->_build_headers($options{params}),
sub {
my ($body, $headers) = @_;
if ($body && $headers->{Status} == 200) {
- $cv->send( $cb->(JSON::decode_json($body)) );
- }else{
- $cv->send( $cb->(0) );
+ $cv->send($cb->(JSON::decode_json($body)));
+ }
+ else {
+ $cv->send($cb->(0));
}
}
);
@@ -196,66 +183,54 @@ sub fetch {
}
sub store {
- my ( $self, $bucket, $key, $object, %options ) = @_;
- my ( $cv, $cb );
+ my ($self, $bucket, $key, $object, %options) = @_;
- $cv = AE::cv;
- if ( $options{callback} ) {
- $cb = delete $options{callback};
- }
- else {
- $cb = $self->default_cb();
- }
+ my ($cv, $cb) = $self->cvcb(\%options);
my $json = JSON::encode_json($object);
http_request(
POST => $self->_build_uri(
- [ $self->{path}, $bucket, $key ],
- $options{parameters}
+ [$self->{path}, $bucket, $key],
+ $options{params}
),
- headers => $self->_build_headers( $options{parameters} ),
+ headers => $self->_build_headers($options{params}),
body => $json,
sub {
my ($body, $headers) = @_;
my $result;
if ($headers->{Status} == 204) {
$result = $body ? JSON::decode_json($body) : 1;
- }else{
+ }
+ else {
$result = 0;
}
- $cv->send( $cb->($result) );
+ $cv->send($cb->($result));
}
);
$cv;
}
sub delete {
- my ( $self, $bucket, $key, %options ) = @_;
- my ( $cv, $cb );
+ my ($self, $bucket, $key, %options) = @_;
- $cv = AE::cv;
- if ( $options{callback} ) {
- $cb = delete $options{callback};
- }
- else {
- $cb = $self->default_cb();
- }
+ my ($cv, $cb) = $self->cvcb(\%options);
http_request(
DELETE => $self->_build_uri(
- [ $self->{path}, $bucket, $key ],
- $options{parameters}
+ [$self->{path}, $bucket, $key],
+ $options{params}
),
- headers => $self->_build_headers( $options{parameters} ),
+ headers => $self->_build_headers($options{params}),
sub {
- $cv->send( $cb->(@_) );
+ $cv->send($cb->(@_));
}
);
$cv;
}
1;
+
__END__
=head1 NAME
@@ -273,9 +248,7 @@ AnyEvent::Riak - Non-blocking Riak client
die "Riak is not running" unless $riak->is_alive->recv;
- my $bucket = $riak->set_bucket( 'foo',
- parameters => { { props => { n_val => 2 } } } )->recv;
-
+ my $bucket = $riak->set_bucket('foo', {props => {n_val => 5}})->recv;
This version is not compatible with the previous version (0.01) of this module and with Riak < 0.91.
@@ -290,14 +263,14 @@ AnyEvent::Riak is a non-blocking riak client using C<AnyEvent>. This client allo
=over 4
-=item B<is_alive>([callback => sub { }, parameters => { }])
+=item B<is_alive>([callback => sub { }, params => { }])
Check if the Riak server is alive. If the ping is successful, 1 is returned,
else 0.
my $ping = $riak->is_alive->recv;
-=item B<list_bucket>($bucketname, [callback => sub { }, parameters => { }])
+=item B<list_bucket>($bucketname, [callback => sub { }, params => { }])
Get the schema and key list for 'bucket'. Possible parameters are:
@@ -332,7 +305,6 @@ describing the bucket is returned.
=item B<set_bucket>($bucketname, $bucketschema, [parameters => { }, callback => sub { }])
Sets bucket properties like n_val and allow_mult.
-
=over 2
=item