summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/Role/REST
diff options
context:
space:
mode:
authorRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
committerRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
commit79bea382fd2c0753ca9ace79a11bb74c9a1d722b (patch)
treebde42a47792a27e0a863ee527b88c8c24258f7e9 /lib/Net/Riak/Role/REST
parentMerge remote branch 'simon/fix_link_encoding' (diff)
downloadnet-riak-79bea382fd2c0753ca9ace79a11bb74c9a1d722b.tar.gz
merged pbc branch to master
Diffstat (limited to '')
-rw-r--r--lib/Net/Riak/Role/REST.pm63
-rw-r--r--lib/Net/Riak/Role/REST/Bucket.pm73
-rw-r--r--lib/Net/Riak/Role/REST/Link.pm52
-rw-r--r--lib/Net/Riak/Role/REST/MapReduce.pm40
-rw-r--r--lib/Net/Riak/Role/REST/Object.pm160
5 files changed, 384 insertions, 4 deletions
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm
index 136ea88..dfab5a0 100644
--- a/lib/Net/Riak/Role/REST.pm
+++ b/lib/Net/Riak/Role/REST.pm
@@ -3,12 +3,36 @@ package Net::Riak::Role::REST;
# ABSTRACT: role for REST operations
use URI;
-use HTTP::Request;
+
use Moose::Role;
+use MooseX::Types::Moose 'Bool';
+use Net::Riak::Types qw/HTTPResponse HTTPRequest/;
+use Data::Dump 'pp';
+with qw/Net::Riak::Role::REST::Bucket
+ Net::Riak::Role::REST::Object
+ Net::Riak::Role::REST::Link
+ Net::Riak::Role::REST::MapReduce
+ /;
+
+has http_request => (
+ is => 'rw',
+ isa => HTTPRequest,
+);
+
+has http_response => (
+ is => 'rw',
+ isa => HTTPResponse,
+ handles => {
+ is_success => 'is_success',
+ status => 'code',
+ }
+);
-requires 'http_request';
-requires 'http_response';
-requires 'useragent';
+has disable_return_body => (
+ is => 'rw',
+ isa => Bool,
+ default => 0
+);
sub _build_path {
my ($self, $path) = @_;
@@ -37,9 +61,40 @@ sub send_request {
$self->http_request($req);
my $r = $self->useragent->request($req);
+
$self->http_response($r);
+ if ($ENV{RIAK_VERBOSE}) {
+ print STDERR pp($r);
+ }
+
return $r;
}
+sub is_alive {
+ my $self = shift;
+ my $request = $self->new_request('HEAD', ['ping']);
+ my $response = $self->send_request($request);
+ $self->is_success ? return 1 : return 0;
+}
+
+sub all_buckets {
+ my $self = shift;
+ my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'});
+ my $response = $self->send_request($request);
+ die "Failed to fetch buckets.. are you running riak 0.14+?"
+ unless $response->is_success;
+ my $resp = JSON::decode_json($response->content);
+ return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : ();
+}
+
+sub server_info { die "->server_info not supported by the REST interface" }
+
+sub stats {
+ my $self = shift;
+ my $request = $self->new_request('GET', ["stats"]);
+ my $response = $self->send_request($request);
+ return JSON::decode_json($response->content);
+}
+
1;
diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm
new file mode 100644
index 0000000..8a037c0
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Bucket.pm
@@ -0,0 +1,73 @@
+package Net::Riak::Role::REST::Bucket;
+
+use Moose::Role;
+use JSON;
+
+sub get_properties {
+ my ($self, $name, $params) = @_;
+
+ # Callbacks require stream mode
+ $params->{keys} = 'stream' if $params->{cb};
+
+ $params->{props} = 'true' unless exists $params->{props};
+ $params->{keys} = 'false' unless exists $params->{keys};
+
+ my $request = $self->new_request(
+ 'GET', [$self->prefix, $name], $params
+ );
+
+ my $response = $self->send_request($request);
+
+ unless ($response->is_success) {
+ die "Error getting bucket properties: ".$response->status_line."\n";
+ }
+
+ if ($params->{keys} ne 'stream') {
+ return JSON::decode_json($response->content);
+ }
+
+ # In streaming mode, aggregate keys from the multiple returned chunk objects
+ else {
+ my $json = JSON->new;
+ my $props = $json->incr_parse($response->content);
+ if ($params->{cb}) {
+ while (defined(my $obj = $json->incr_parse)) {
+ $params->{cb}->($_) foreach @{$obj->{keys}};
+ }
+ return %$props ? { props => $props } : {};
+ }
+ else {
+ my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
+ $json->incr_parse;
+ return { props => $props, keys => \@keys };
+ }
+ }
+}
+
+sub set_properties {
+ my ($self, $bucket, $props) = @_;
+
+ my $request = $self->new_request(
+ 'PUT', [$self->prefix, $bucket->name]
+ );
+
+ $request->header('Content-Type' => $bucket->content_type);
+ $request->content(JSON::encode_json({props => $props}));
+
+ my $response = $self->send_request($request);
+ unless ($response->is_success) {
+ die "Error setting bucket properties: ".$response->status_line."\n";
+ }
+}
+
+sub get_keys {
+ my ($self, $bucket, $params) = @_;
+
+ my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
+ $params = { props => 'false', keys => $key_mode, %$params };
+ my $properties = $self->get_properties($bucket, $params);
+
+ return $properties->{keys};
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm
new file mode 100644
index 0000000..fbead86
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Link.pm
@@ -0,0 +1,52 @@
+package Net::Riak::Role::REST::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+ my ($self, $object, $links) = @_;
+
+ for my $link (split(',', $links)) {
+ if ($link
+ =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
+ {
+ my $bucket = _uri_decode($2);
+ my $key = _uri_decode($3);
+ my $tag = _uri_decode($4);
+ my $l = Net::Riak::Link->new(
+ bucket => Net::Riak::Bucket->new(
+ name => $bucket,
+ client => $self
+ ),
+ key => $key,
+ tag => $tag
+ );
+ $object->add_link($l);
+ }
+ }
+}
+
+sub _uri_decode {
+ my $str = shift;
+ $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
+ return $str;
+}
+
+sub _links_to_header {
+ my ($self, $object) = @_;
+ join(', ', map { $self->link_to_header($_) } $object->links);
+}
+
+sub link_to_header {
+ my ($self, $link) = @_;
+
+ my $link_header = '';
+ $link_header .= '</';
+ $link_header .= $self->prefix . '/';
+ $link_header .= $link->bucket->name . '/';
+ $link_header .= $link->key . '>; riaktag="';
+ $link_header .= $link->tag . '"';
+ return $link_header;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm
new file mode 100644
index 0000000..e987a21
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/MapReduce.pm
@@ -0,0 +1,40 @@
+package Net::Riak::Role::REST::MapReduce;
+use Moose::Role;
+use JSON;
+use Data::Dumper;
+
+sub execute_job {
+ my ($self, $job, $timeout) = @_;
+
+ # save existing timeout value.
+ my $ua_timeout = $self->useragent->timeout();
+
+ if ($timeout) {
+ if ($ua_timeout < ($timeout/1000)) {
+ $self->useragent->timeout(int($timeout/1000));
+ }
+ $job->{timeout} = $timeout;
+ }
+
+ my $content = JSON::encode_json($job);
+
+ my $request = $self->new_request(
+ 'POST', [$self->mapred_prefix]
+ );
+ $request->content($content);
+
+ my $response = $self->send_request($request);
+
+ # restore time out value
+ if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) {
+ $self->useragent->timeout($ua_timeout);
+ }
+
+ unless ($response->is_success) {
+ die "MapReduce query failed: ".$response->status_line;
+ }
+
+ return JSON::decode_json($response->content);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm
new file mode 100644
index 0000000..d38315a
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Object.pm
@@ -0,0 +1,160 @@
+package Net::Riak::Role::REST::Object;
+
+use Moose::Role;
+use JSON;
+
+sub store_object {
+ my ($self, $w, $dw, $object) = @_;
+
+ my $params = {returnbody => 'true', w => $w, dw => $dw};
+
+ $params->{returnbody} = 'false'
+ if $self->disable_return_body;
+
+
+ my $request;
+ if ( defined $object->key ) {
+ $request = $self->new_request('PUT',
+ [$self->prefix, $object->bucket->name, $object->key], $params);
+ } else {
+ $request = $self->new_request('POST',
+ [$self->prefix, $object->bucket->name ], $params);
+ }
+
+ $request->header('X-Riak-ClientID' => $self->client_id);
+ $request->header('Content-Type' => $object->content_type);
+
+ if ($object->has_vclock) {
+ $request->header('X-Riak-Vclock' => $object->vclock);
+ }
+
+ if ($object->has_links) {
+ $request->header('link' => $self->_links_to_header($object));
+ }
+
+ if (ref $object->data && $object->content_type eq 'application/json') {
+ $request->content(JSON::encode_json($object->data));
+ }
+ else {
+ $request->content($object->data);
+ }
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [200, 201, 204, 300]);
+ return $object;
+}
+
+sub load_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $request =
+ $self->new_request( 'GET',
+ [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [ 200, 300, 404 ] );
+ $object;
+}
+
+sub delete_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $request =
+ $self->new_request( 'DELETE',
+ [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [ 204, 404 ] );
+ $object;
+}
+
+sub populate_object {
+ my ($self, $obj, $http_response, $expected) = @_;
+
+ $obj->_clear_links;
+ $obj->exists(0);
+
+ return if (!$http_response);
+
+ my $status = $http_response->code;
+
+ $obj->data($http_response->content)
+ unless $self->disable_return_body;
+
+ if ( $http_response->header('location') ) {
+ $obj->key( $http_response->header('location') );
+ $obj->location( $http_response->header('location') );
+ }
+
+ if (!grep { $status == $_ } @$expected) {
+ confess "Expected status "
+ . (join(', ', @$expected))
+ . ", received $status"
+ }
+
+ if ($status == 404) {
+ $obj->clear;
+ return;
+ }
+
+ $obj->exists(1);
+
+ if ($http_response->header('link')) {
+ $self->_populate_links($obj, $http_response->header('link'));
+ }
+
+ if ($status == 300) {
+ my @siblings = split("\n", $obj->data);
+ shift @siblings;
+ my %seen; @siblings = grep { !$seen{$_}++ } @siblings;
+ $obj->siblings(\@siblings);
+ }
+
+ if ($status == 201) {
+ my $location = $http_response->header('location');
+ my ($key) = ($location =~ m!/([^/]+)$!);
+ $obj->key($key);
+ }
+
+
+ if ($status == 200 || $status == 201) {
+ $obj->content_type($http_response->content_type)
+ if $http_response->content_type;
+ $obj->data(JSON::decode_json($obj->data))
+ if $obj->content_type eq 'application/json';
+ $obj->vclock($http_response->header('X-Riak-Vclock'));
+ }
+}
+
+sub retrieve_sibling {
+ my ($self, $object, $params) = @_;
+
+ my $request = $self->new_request(
+ 'GET',
+ [$self->prefix, $object->bucket->name, $object->key],
+ $params
+ );
+
+ my $response = $self->send_request($request);
+
+ my $sibling = Net::Riak::Object->new(
+ client => $self,
+ bucket => $object->bucket,
+ key => $object->key
+ );
+
+ $sibling->_jsonize($object->_jsonize);
+ $self->populate_object($sibling, $response, [200]);
+ $sibling;
+}
+
+
+
+
+1;
+__END__
+
+=item populate_object
+
+Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
+