summaryrefslogtreecommitdiff
path: root/lib/Net/Riak
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net/Riak')
-rw-r--r--lib/Net/Riak/Bucket.pm82
-rw-r--r--lib/Net/Riak/Client.pm34
-rw-r--r--lib/Net/Riak/Link.pm15
-rw-r--r--lib/Net/Riak/MapReduce.pm31
-rw-r--r--lib/Net/Riak/Object.pm189
-rw-r--r--lib/Net/Riak/Role/Hosts.pm22
-rw-r--r--lib/Net/Riak/Role/PBC.pm78
-rw-r--r--lib/Net/Riak/Role/PBC/Bucket.pm46
-rw-r--r--lib/Net/Riak/Role/PBC/Link.pm35
-rw-r--r--lib/Net/Riak/Role/PBC/MapReduce.pm37
-rw-r--r--lib/Net/Riak/Role/PBC/Message.pm21
-rw-r--r--lib/Net/Riak/Role/PBC/Object.pm131
-rw-r--r--lib/Net/Riak/Role/REST.pm63
-rw-r--r--lib/Net/Riak/Role/REST/Bucket.pm73
-rw-r--r--lib/Net/Riak/Role/REST/Link.pm52
-rw-r--r--lib/Net/Riak/Role/REST/MapReduce.pm40
-rw-r--r--lib/Net/Riak/Role/REST/Object.pm160
-rw-r--r--lib/Net/Riak/Role/UserAgent.pm9
-rw-r--r--lib/Net/Riak/Transport/PBC.pm9
-rw-r--r--lib/Net/Riak/Transport/PBC/Code.pm90
-rw-r--r--lib/Net/Riak/Transport/PBC/Message.pm121
-rw-r--r--lib/Net/Riak/Transport/PBC/Transport.pm483
-rw-r--r--lib/Net/Riak/Transport/REST.pm11
-rw-r--r--lib/Net/Riak/Types.pm38
24 files changed, 1547 insertions, 323 deletions
diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm
index 2bc334e..19f5d94 100644
--- a/lib/Net/Riak/Bucket.pm
+++ b/lib/Net/Riak/Bucket.pm
@@ -1,16 +1,14 @@
package Net::Riak::Bucket;
-
-# ABSTRACT: Access and change information about a Riak bucket
-
-use JSON;
use Moose;
-use Carp;
use Net::Riak::Object;
-
+use Net::Riak::Types Client => {-as => 'Client_T'};
with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
-with 'Net::Riak::Role::Base' => {
- classes => [{ name => 'client', required => 1, }]
-};
+
+has client => (
+ is => 'rw',
+ isa => Client_T,
+ required => 1,
+);
has name => (
is => 'ro',
@@ -37,20 +35,17 @@ sub allow_multiples {
my $self = shift;
if (my $val = shift) {
- my $bool = ($val == 1 ? JSON::true : JSON::false);
+ my $bool = ($val == 1 ? 1 : 0);
$self->set_property('allow_mult', $bool);
}
else {
- return $self->get_property('allow_mult');
+ return $self->get_property('allow_mult') ? 1 : 0;
}
}
sub get_keys {
my ($self, $params) = @_;
- my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
- $params = { props => 'false', keys => $key_mode, %$params };
- my $properties = $self->get_properties($params);
- return $properties->{keys};
+ $self->client->get_keys($self->name, $params);
}
sub get {
@@ -66,12 +61,12 @@ sub get {
}
sub delete_object {
- my ($self, $key) = @_;
+ my ($self, $key, $dw) = @_;
Net::Riak::Object->new(
client => $self->client,
bucket => $self,
key => $key
- )->delete;
+ )->delete($dw);
}
sub set_property {
@@ -87,59 +82,12 @@ sub get_property {
sub get_properties {
my ($self, $params) = @_;
-
- # Callbacks require stream mode
- $params->{keys} = 'stream' if $params->{cb};
-
- $params->{props} = 'true' unless exists $params->{props};
- $params->{keys} = 'false' unless exists $params->{keys};
-
- my $request = $self->client->new_request(
- 'GET', [$self->client->prefix, $self->name], $params
- );
-
- my $response = $self->client->send_request($request);
-
- unless ($response->is_success) {
- die "Error getting bucket properties: ".$response->status_line."\n";
- }
-
- if ($params->{keys} ne 'stream') {
- return JSON::decode_json($response->content);
- }
-
- # In streaming mode, aggregate keys from the multiple returned chunk objects
- else {
- my $json = JSON->new;
- my $props = $json->incr_parse($response->content);
- if ($params->{cb}) {
- while (defined(my $obj = $json->incr_parse)) {
- $params->{cb}->($_) foreach @{$obj->{keys}};
- }
- return %$props ? { props => $props } : {};
- }
- else {
- my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
- $json->incr_parse;
- return { props => $props, keys => \@keys };
- }
- }
+ $self->client->get_properties($self->name, $params);
}
sub set_properties {
my ($self, $props) = @_;
-
- my $request = $self->client->new_request(
- 'PUT', [$self->client->prefix, $self->name]
- );
-
- $request->header('Content-Type' => $self->content_type);
- $request->content(JSON::encode_json({props => $props}));
-
- my $response = $self->client->send_request($request);
- unless ($response->is_success) {
- die "Error setting bucket properties: ".$response->status_line."\n";
- }
+ $self->client->set_properties($self, $props);
}
sub new_object {
@@ -169,7 +117,7 @@ sub new_object {
my $obj2 = $bucket->new_object('foo2', {...});
$object->store;
- $bucket->delete_object($key);
+ $bucket->delete_object($key, 3); # optional w val
=head1 DESCRIPTION
diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm
index 4d14338..f38bec6 100644
--- a/lib/Net/Riak/Client.pm
+++ b/lib/Net/Riak/Client.pm
@@ -2,10 +2,8 @@ package Net::Riak::Client;
use Moose;
use MIME::Base64;
-use Moose::Util::TypeConstraints;
-class_type 'HTTP::Request';
-class_type 'HTTP::Response';
+with 'MooseX::Traits';
has prefix => (
is => 'rw',
@@ -27,40 +25,10 @@ has client_id => (
isa => 'Str',
lazy_build => 1,
);
-has http_request => (
- is => 'rw',
- isa => 'HTTP::Request',
-);
-
-has http_response => (
- is => 'rw',
- isa => 'HTTP::Response',
- handles => ['is_success']
-);
-
-has ua_timeout => (
- is => 'rw',
- isa => 'Int',
- default => 3
-);
-
-with 'Net::Riak::Role::UserAgent';
-with qw/
- Net::Riak::Role::REST
- Net::Riak::Role::Hosts
- /;
-
-
sub _build_client_id {
"perl_net_riak" . encode_base64(int(rand(10737411824)), '');
}
-sub is_alive {
- my $self = shift;
- my $request = $self->new_request('GET', ['ping']);
- my $response = $self->send_request($request);
- $self->is_success ? return 1 : return 0;
-}
1;
diff --git a/lib/Net/Riak/Link.pm b/lib/Net/Riak/Link.pm
index 980aabb..57881a0 100644
--- a/lib/Net/Riak/Link.pm
+++ b/lib/Net/Riak/Link.pm
@@ -20,19 +20,4 @@ has tag => (
default => sub {(shift)->bucket->name}
);
-sub to_link_header {
- my ($self, $client) = @_;
-
- $client ||= $self->client;
-
- my $link = '';
- $link .= '</';
- $link .= $client->prefix . '/';
- $link .= $self->bucket->name . '/';
- $link .= $self->key . '>; riaktag="';
- $link .= $self->tag . '"';
- return $link;
-}
-
1;
-
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index d05c30a..10a7c98 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -6,6 +6,8 @@ use JSON;
use Moose;
use Scalar::Util;
+use Data::Dumper;
+
use Net::Riak::LinkPhase;
use Net::Riak::MapReducePhase;
@@ -156,35 +158,12 @@ sub run {
$inputs = $self->inputs;
}
- my $ua_timeout = $self->client->useragent->timeout();
-
my $job = {inputs => $inputs, query => $query};
- if ($timeout) {
- if ($ua_timeout < ($timeout/1000)) {
- $self->client->useragent->timeout(int($timeout/1000));
- }
- $job->{timeout} = $timeout;
- }
-
-
- my $content = JSON::encode_json($job);
- my $request = $self->client->new_request(
- 'POST', [$self->client->mapred_prefix]
- );
- $request->content($content);
-
- my $response = $self->client->send_request($request);
-
- unless ($response->is_success) {
- die "MapReduce query failed: ".$response->status_line;
- }
-
- my $result = JSON::decode_json($response->content);
+ # how phases set to 'keep'.
+ my $p = scalar ( grep { $_->keep } $self->phases);
- if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
- $self->client->useragent->timeout($ua_timeout);
- }
+ my $result = $self->client->execute_job($job, $timeout, $p);
my @phases = $self->phases;
if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm
index f40031b..7148d4f 100644
--- a/lib/Net/Riak/Object.pm
+++ b/lib/Net/Riak/Object.pm
@@ -2,24 +2,27 @@ package Net::Riak::Object;
# ABSTRACT: holds meta information about a Riak object
-use Carp;
-use JSON;
use Moose;
use Scalar::Util;
use Net::Riak::Link;
with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
with 'Net::Riak::Role::Base' => {classes =>
- [{name => 'bucket', required => 1}, {name => 'client', required => 1}]};
-
+ [{name => 'bucket', required => 1}]};
+use Net::Riak::Types Client => {-as => 'Client_T'};
+has client => (
+ is => 'rw',
+ isa => Client_T,
+ required => 1,
+);
has key => (is => 'rw', isa => 'Str', required => 0);
-has status => (is => 'rw', isa => 'Int');
has exists => (is => 'rw', isa => 'Bool', default => 0,);
has data => (is => 'rw', isa => 'Any', clearer => '_clear_data');
-has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock',);
+has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock');
+has vtag => (is => 'rw', isa => 'Str');
has content_type => (is => 'rw', isa => 'Str', default => 'application/json');
-has _headers => (is => 'rw', isa => 'HTTP::Response',);
-has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1,);
+has location => ( is => 'rw', isa => 'Str' );
+has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1);
has links => (
traits => ['Array'],
is => 'rw',
@@ -31,6 +34,7 @@ has links => (
count_links => 'elements',
append_link => 'push',
has_links => 'count',
+ all_links => 'elements',
},
clearer => '_clear_links',
);
@@ -52,62 +56,31 @@ has siblings => (
clearer => '_clear_siblings',
);
+after count_links => sub {
+ warn "DEPRECATED: count_links method will be removed in the 0.17 release, please use has_links.";
+};
+
sub store {
my ($self, $w, $dw) = @_;
$w ||= $self->w;
$dw ||= $self->dw;
- my $params = {returnbody => 'true', w => $w, dw => $dw};
- my $path = [$self->client->prefix, $self->bucket->name];
- my $method = 'POST';
- if (defined $self->key) {
- push @$path, $self->key;
- $method = 'PUT';
- }
-
- my $request = $self->client->new_request($method, $path, $params);
-
- $request->header('X-Riak-ClientID' => $self->client->client_id);
- $request->header('Content-Type' => $self->content_type);
-
- if ($self->has_vclock) {
- $request->header('X-Riak-Vclock' => $self->vclock);
- }
-
- if ($self->has_links) {
- $request->header('link' => $self->_links_to_header);
- }
-
- if (ref $self->data && $self->content_type eq 'application/json') {
- $request->content(JSON::encode_json($self->data));
- }
- else {
- $request->content($self->data);
- }
-
- my $response = $self->client->send_request($request);
- $self->populate($response, [200, 201, 204, 300]);
- $self;
+ $self->client->store_object($w, $dw, $self);
}
-sub _links_to_header {
- my $self = shift;
- join(', ', map { $_->to_link_header($self->client) } $self->links);
-}
+sub status {
+ my ($self) = @_;
+ warn "DEPRECATED: status method will be removed in the 0.17 release, please use ->client->status.";
+ $self->client->status;
+}
sub load {
my $self = shift;
my $params = {r => $self->r};
- my $request =
- $self->client->new_request('GET',
- [$self->client->prefix, $self->bucket->name, $self->key], $params);
-
- my $response = $self->client->send_request($request);
- $self->populate($response, [200, 300, 404]);
- $self;
+ $self->client->load_object($params, $self);
}
sub delete {
@@ -116,13 +89,7 @@ sub delete {
$dw ||= $self->bucket->dw;
my $params = {dw => $dw};
- my $request =
- $self->client->new_request('DELETE',
- [$self->client->prefix, $self->bucket->name, $self->key], $params);
-
- my $response = $self->client->send_request($request);
- $self->populate($response, [204, 404]);
- $self;
+ $self->client->delete_object($params, $self);
}
sub clear {
@@ -133,109 +100,17 @@ sub clear {
$self;
}
-sub populate {
- my ($self, $http_response, $expected) = @_;
-
- $self->clear;
-
- return if (!$http_response);
-
- my $status = $http_response->code;
- $self->_headers($http_response);
- $self->status($status);
-
- $self->data($http_response->content);
-
- if (!grep { $status == $_ } @$expected) {
- confess "Expected status "
- . (join(', ', @$expected))
- . ", received $status"
- }
-
- if ($status == 404) {
- $self->clear;
- return;
- }
-
- $self->exists(1);
-
- if ($http_response->header('link')) {
- $self->_populate_links($http_response->header('link'));
- }
-
- if ($status == 300) {
- my @siblings = split("\n", $self->data);
- shift @siblings;
- $self->siblings(\@siblings);
- }
-
- if ($status == 201) {
- my $location = $http_response->header('location');
- my ($key) = ($location =~ m!/([^/]+)$!);
- $self->key($key);
- }
-
-
- if ($status == 200 || $status == 201) {
- $self->content_type($http_response->content_type)
- if $http_response->content_type;
- $self->data(JSON::decode_json($self->data))
- if $self->content_type eq 'application/json';
- $self->vclock($http_response->header('X-Riak-Vclock'));
- }
-}
-
-sub _uri_decode {
- my $str = shift;
- $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
- return $str;
-}
-
-sub _populate_links {
- my ($self, $links) = @_;
- for my $link (split(',', $links)) {
- if ($link
- =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
- {
- my $bucket = _uri_decode($2);
- my $key = _uri_decode($3);
- my $tag = _uri_decode($4);
- my $l = Net::Riak::Link->new(
- bucket => Net::Riak::Bucket->new(
- name => $bucket,
- client => $self->client
- ),
- key => $key,
- tag => $tag
- );
- $self->add_link($l);
- }
- }
-}
-
sub sibling {
my ($self, $id, $r) = @_;
$r ||= $self->bucket->r;
my $vtag = $self->get_sibling($id);
- my $params = {r => $r, vtag => $vtag};
- my $request =
- $self->client->new_request('GET',
- [$self->client->prefix, $self->bucket->name, $self->key], $params);
- my $response = $self->client->send_request($request);
-
- my $obj = Net::Riak::Object->new(
- client => $self->client,
- bucket => $self->bucket,
- key => $self->key
+ return $self->client->retrieve_sibling(
+ $self, {r => $r, vtag => $vtag}
);
- $obj->_jsonize($self->_jsonize);
- $obj->populate($response, [200]);
- $obj;
}
-
sub _build_link {
my ($self,$obj,$tag) = @_;
blessed $obj && $obj->isa('Net::Riak::Link')
@@ -337,10 +212,6 @@ Get or set the data stored in this object.
=item B<content_type>
-=item B<status>
-
-Get the HTTP status from the last operation on this object.
-
=item B<links>
Get an array of L<Net::Riak::Link> objects
@@ -359,7 +230,11 @@ Return an array of Siblings
=over 4
-=item count_links
+=item all_links
+
+Return the number of links
+
+=item has_links
Return the number of links
@@ -445,7 +320,7 @@ Return true if this object has siblings
Return true if this object has no siblings
-=item populate
+=item populate_object
Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
diff --git a/lib/Net/Riak/Role/Hosts.pm b/lib/Net/Riak/Role/Hosts.pm
index 34d9273..639d472 100644
--- a/lib/Net/Riak/Role/Hosts.pm
+++ b/lib/Net/Riak/Role/Hosts.pm
@@ -1,29 +1,11 @@
package Net::Riak::Role::Hosts;
use Moose::Role;
-use Moose::Util::TypeConstraints;
-
-subtype 'RiakHost' => as 'ArrayRef[HashRef]';
-
-coerce 'RiakHost' => from 'Str' => via {
- [{node => $_, weight => 1}];
-};
-coerce 'RiakHost' => from 'ArrayRef' => via {
- my $backends = $_;
- my $weight = 1 / @$backends;
- [map { {node => $_, weight => $weight} } @$backends];
-};
-coerce 'RiakHost' => from 'HashRef' => via {
- my $backends = $_;
- my $total = 0;
- $total += $_ for values %$backends;
- [map { {node => $_, weight => $backends->{$_} / $total} }
- keys %$backends];
-};
+use Net::Riak::Types qw(RiakHost);
has host => (
is => 'rw',
- isa => 'RiakHost',
+ isa => RiakHost,
coerce => 1,
default => 'http://127.0.0.1:8098',
);
diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm
new file mode 100644
index 0000000..605f032
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC.pm
@@ -0,0 +1,78 @@
+package Net::Riak::Role::PBC;
+
+use Moose::Role;
+use MooseX::Types::Moose qw/Str Int/;
+
+with qw(
+ Net::Riak::Role::PBC::Message
+ Net::Riak::Role::PBC::Bucket
+ Net::Riak::Role::PBC::MapReduce
+ Net::Riak::Role::PBC::Link
+ Net::Riak::Role::PBC::Object);
+
+use Net::Riak::Types 'Socket';
+use IO::Socket::INET;
+
+has [qw/r w dw/] => (
+ is => 'rw',
+ isa => Int,
+ default => 2
+);
+
+has host => (
+ is => 'ro',
+ isa => Str,
+ required => 1,
+);
+
+has port => (
+ is => 'ro',
+ isa => Int,
+ required => 1,
+);
+
+has socket => (
+ is => 'rw',
+ isa => Socket,
+ predicate => 'has_socket',
+);
+
+sub is_alive {
+ my $self = shift;
+ return $self->send_message('PingReq');
+}
+
+sub connected {
+ my $self = shift;
+ return $self->has_socket && $self->socket->connected ? 1 : 0;
+}
+
+sub connect {
+ my $self = shift;
+ return if $self->has_socket && $self->connected;
+
+ $self->socket(
+ IO::Socket::INET->new(
+ PeerAddr => $self->host,
+ PeerPort => $self->port,
+ Proto => 'tcp',
+ Timeout => 30,
+ )
+ );
+}
+
+sub all_buckets {
+ my $self = shift;
+ my $resp = $self->send_message('ListBucketsReq');
+ return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : ();
+}
+
+sub server_info {
+ my $self = shift;
+ my $resp = $self->send_message('GetServerInfoReq');
+ return $resp;
+}
+
+sub stats { die "->stats is only avaliable through the REST interface" }
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm
new file mode 100644
index 0000000..aa7d7fa
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Bucket.pm
@@ -0,0 +1,46 @@
+package Net::Riak::Role::PBC::Bucket;
+
+use Moose::Role;
+use Data::Dumper;
+
+sub get_properties {
+ my ( $self, $name, $params ) = @_;
+ my $resp = $self->send_message( GetBucketReq => { bucket => $name } );
+ return { props => { %{ $resp->props } } };
+}
+
+sub set_properties {
+ my ( $self, $bucket, $props ) = @_;
+ return $self->send_message(
+ SetBucketReq => {
+ bucket => $bucket->name,
+ props => $props
+ }
+ );
+}
+
+sub get_keys {
+ my ( $self, $name, $params) = @_;
+ my $keys = [];
+
+ my $res = $self->send_message(
+ ListKeysReq => { bucket => $name, },
+ sub {
+ if ( defined $_[0]->keys ) {
+ if ($params->{cb}) {
+ $params->{cb}->($_) for @{ $_[0]->keys };
+ }
+ else {
+ push @$keys, @{ $_[0]->keys };
+ }
+ }
+ }
+ );
+
+ return $params->{cb} ? undef : $keys;
+}
+
+
+
+1;
+
diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm
new file mode 100644
index 0000000..5e6a336
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Link.pm
@@ -0,0 +1,35 @@
+package Net::Riak::Role::PBC::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+ my ($self, $object, $links) = @_;
+
+ for my $link (@$links) {
+ my $l = Net::Riak::Link->new(
+ bucket => Net::Riak::Bucket->new(
+ name => $link->bucket,
+ client => $self
+ ),
+ key => $link->key,
+ tag => $link->tag
+ );
+ $object->add_link($l);
+ }
+}
+
+sub _links_for_message {
+ my ($self, $object) = @_;
+
+ return [
+ map { {
+ tag => $_->tag,
+ key => $_->key,
+ bucket => $_->bucket->name
+ }
+ } $object->all_links
+ ]
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm
new file mode 100644
index 0000000..afeabe8
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/MapReduce.pm
@@ -0,0 +1,37 @@
+package Net::Riak::Role::PBC::MapReduce;
+use Moose::Role;
+use JSON;
+use List::Util 'sum';
+use Data::Dump 'pp';
+
+sub execute_job {
+ my ($self, $job, $timeout, $returned_phases) = @_;
+
+ $job->{timeout} = $timeout;
+
+ my $job_request = JSON::encode_json($job);
+
+ my $results;
+
+ my $resp = $self->send_message( MapRedReq => {
+ request => $job_request,
+ content_type => 'application/json'
+ }, sub { push @$results, $self->decode_phase(shift) })
+ or
+ die "MapReduce query failed!";
+
+
+ return $returned_phases == 1 ? $results->[0] : $results;
+}
+
+sub decode_phase {
+ my ($self, $resp) = @_;
+
+ if (defined $resp->response && length($resp->response)) {
+ return JSON::decode_json($resp->response);
+ }
+
+ return;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm
new file mode 100644
index 0000000..0c2fbf3
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Message.pm
@@ -0,0 +1,21 @@
+package Net::Riak::Role::PBC::Message;
+
+use Moose::Role;
+use Net::Riak::Transport::PBC::Message;
+
+sub send_message {
+ my ( $self, $type, $params, $cb ) = @_;
+
+ $self->connect unless $self->connected;
+
+ my $message = Net::Riak::Transport::PBC::Message->new(
+ message_type => $type,
+ params => $params || {},
+ );
+
+ $message->socket( $self->socket );
+
+ return $message->send($cb);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm
new file mode 100644
index 0000000..847cac2
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Object.pm
@@ -0,0 +1,131 @@
+package Net::Riak::Role::PBC::Object;
+
+use JSON;
+use Moose::Role;
+use Data::Dumper;
+use List::Util 'first';
+
+sub store_object {
+ my ($self, $w, $dw, $object) = @_;
+
+ my $value = (ref $object->data && $object->content_type eq 'application/json')
+ ? JSON::encode_json($object->data) : $object->data;
+
+ my $content = {
+ content_type => $object->content_type,
+ value => $value,
+ usermeta => undef
+ };
+
+ if ($object->has_links) {
+ $content->{links} = $self->_links_for_message($object);
+ }
+
+ $self->send_message(
+ PutReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ content => $content,
+ }
+ );
+ return $object;
+}
+
+sub load_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $resp = $self->send_message(
+ GetReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ r => $params->{r},
+ }
+ );
+
+ $self->populate_object($object, $resp);
+
+ return $object;
+}
+
+sub delete_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $resp = $self->send_message(
+ DelReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ rw => $params->{dw},
+ }
+ );
+
+ $object;
+}
+
+sub populate_object {
+ my ( $self, $object, $resp) = @_;
+
+ $object->_clear_links;
+ $object->exists(0);
+
+ if ( $resp->content && scalar (@{$resp->content}) > 1) {
+ my %seen;
+ my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content};
+ $object->siblings(\@vtags);
+ }
+
+ my $content = $resp->content ? $resp->content->[0] : undef;
+
+ return unless $content and $resp->vclock;
+
+ $object->vclock($resp->vclock);
+ $object->vtag($content->vtag);
+ $object->content_type($content->content_type);
+
+ if($content->links) {
+ $self->_populate_links($object, $content->links);
+ }
+
+ my $data = ($object->content_type eq 'application/json')
+ ? JSON::decode_json($content->value) : $content->value;
+
+ $object->exists(1);
+
+ $object->data($data);
+}
+
+
+# This emulates the behavior of the existing REST client.
+sub retrieve_sibling {
+ my ($self, $object, $params) = @_;
+
+ my $resp = $self->send_message(
+ GetReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ r => $params->{r},
+ }
+ );
+
+ # hack for loading 1 sibling
+ if ($params->{vtag}) {
+ $resp->{content} = [
+ first {
+ $_->vtag eq $params->{vtag}
+ } @{$resp->content}
+ ];
+ }
+
+ my $sibling = Net::Riak::Object->new(
+ client => $self,
+ bucket => $object->bucket,
+ key => $object->key
+ );
+
+ $sibling->_jsonize($object->_jsonize);
+
+ $self->populate_object($sibling, $resp);
+
+ $sibling;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm
index 136ea88..dfab5a0 100644
--- a/lib/Net/Riak/Role/REST.pm
+++ b/lib/Net/Riak/Role/REST.pm
@@ -3,12 +3,36 @@ package Net::Riak::Role::REST;
# ABSTRACT: role for REST operations
use URI;
-use HTTP::Request;
+
use Moose::Role;
+use MooseX::Types::Moose 'Bool';
+use Net::Riak::Types qw/HTTPResponse HTTPRequest/;
+use Data::Dump 'pp';
+with qw/Net::Riak::Role::REST::Bucket
+ Net::Riak::Role::REST::Object
+ Net::Riak::Role::REST::Link
+ Net::Riak::Role::REST::MapReduce
+ /;
+
+has http_request => (
+ is => 'rw',
+ isa => HTTPRequest,
+);
+
+has http_response => (
+ is => 'rw',
+ isa => HTTPResponse,
+ handles => {
+ is_success => 'is_success',
+ status => 'code',
+ }
+);
-requires 'http_request';
-requires 'http_response';
-requires 'useragent';
+has disable_return_body => (
+ is => 'rw',
+ isa => Bool,
+ default => 0
+);
sub _build_path {
my ($self, $path) = @_;
@@ -37,9 +61,40 @@ sub send_request {
$self->http_request($req);
my $r = $self->useragent->request($req);
+
$self->http_response($r);
+ if ($ENV{RIAK_VERBOSE}) {
+ print STDERR pp($r);
+ }
+
return $r;
}
+sub is_alive {
+ my $self = shift;
+ my $request = $self->new_request('HEAD', ['ping']);
+ my $response = $self->send_request($request);
+ $self->is_success ? return 1 : return 0;
+}
+
+sub all_buckets {
+ my $self = shift;
+ my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'});
+ my $response = $self->send_request($request);
+ die "Failed to fetch buckets.. are you running riak 0.14+?"
+ unless $response->is_success;
+ my $resp = JSON::decode_json($response->content);
+ return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : ();
+}
+
+sub server_info { die "->server_info not supported by the REST interface" }
+
+sub stats {
+ my $self = shift;
+ my $request = $self->new_request('GET', ["stats"]);
+ my $response = $self->send_request($request);
+ return JSON::decode_json($response->content);
+}
+
1;
diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm
new file mode 100644
index 0000000..8a037c0
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Bucket.pm
@@ -0,0 +1,73 @@
+package Net::Riak::Role::REST::Bucket;
+
+use Moose::Role;
+use JSON;
+
+sub get_properties {
+ my ($self, $name, $params) = @_;
+
+ # Callbacks require stream mode
+ $params->{keys} = 'stream' if $params->{cb};
+
+ $params->{props} = 'true' unless exists $params->{props};
+ $params->{keys} = 'false' unless exists $params->{keys};
+
+ my $request = $self->new_request(
+ 'GET', [$self->prefix, $name], $params
+ );
+
+ my $response = $self->send_request($request);
+
+ unless ($response->is_success) {
+ die "Error getting bucket properties: ".$response->status_line."\n";
+ }
+
+ if ($params->{keys} ne 'stream') {
+ return JSON::decode_json($response->content);
+ }
+
+ # In streaming mode, aggregate keys from the multiple returned chunk objects
+ else {
+ my $json = JSON->new;
+ my $props = $json->incr_parse($response->content);
+ if ($params->{cb}) {
+ while (defined(my $obj = $json->incr_parse)) {
+ $params->{cb}->($_) foreach @{$obj->{keys}};
+ }
+ return %$props ? { props => $props } : {};
+ }
+ else {
+ my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () }
+ $json->incr_parse;
+ return { props => $props, keys => \@keys };
+ }
+ }
+}
+
+sub set_properties {
+ my ($self, $bucket, $props) = @_;
+
+ my $request = $self->new_request(
+ 'PUT', [$self->prefix, $bucket->name]
+ );
+
+ $request->header('Content-Type' => $bucket->content_type);
+ $request->content(JSON::encode_json({props => $props}));
+
+ my $response = $self->send_request($request);
+ unless ($response->is_success) {
+ die "Error setting bucket properties: ".$response->status_line."\n";
+ }
+}
+
+sub get_keys {
+ my ($self, $bucket, $params) = @_;
+
+ my $key_mode = delete($params->{stream}) ? 'stream' : 'true';
+ $params = { props => 'false', keys => $key_mode, %$params };
+ my $properties = $self->get_properties($bucket, $params);
+
+ return $properties->{keys};
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm
new file mode 100644
index 0000000..fbead86
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Link.pm
@@ -0,0 +1,52 @@
+package Net::Riak::Role::REST::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+ my ($self, $object, $links) = @_;
+
+ for my $link (split(',', $links)) {
+ if ($link
+ =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/)
+ {
+ my $bucket = _uri_decode($2);
+ my $key = _uri_decode($3);
+ my $tag = _uri_decode($4);
+ my $l = Net::Riak::Link->new(
+ bucket => Net::Riak::Bucket->new(
+ name => $bucket,
+ client => $self
+ ),
+ key => $key,
+ tag => $tag
+ );
+ $object->add_link($l);
+ }
+ }
+}
+
+sub _uri_decode {
+ my $str = shift;
+ $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg;
+ return $str;
+}
+
+sub _links_to_header {
+ my ($self, $object) = @_;
+ join(', ', map { $self->link_to_header($_) } $object->links);
+}
+
+sub link_to_header {
+ my ($self, $link) = @_;
+
+ my $link_header = '';
+ $link_header .= '</';
+ $link_header .= $self->prefix . '/';
+ $link_header .= $link->bucket->name . '/';
+ $link_header .= $link->key . '>; riaktag="';
+ $link_header .= $link->tag . '"';
+ return $link_header;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm
new file mode 100644
index 0000000..e987a21
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/MapReduce.pm
@@ -0,0 +1,40 @@
+package Net::Riak::Role::REST::MapReduce;
+use Moose::Role;
+use JSON;
+use Data::Dumper;
+
+sub execute_job {
+ my ($self, $job, $timeout) = @_;
+
+ # save existing timeout value.
+ my $ua_timeout = $self->useragent->timeout();
+
+ if ($timeout) {
+ if ($ua_timeout < ($timeout/1000)) {
+ $self->useragent->timeout(int($timeout/1000));
+ }
+ $job->{timeout} = $timeout;
+ }
+
+ my $content = JSON::encode_json($job);
+
+ my $request = $self->new_request(
+ 'POST', [$self->mapred_prefix]
+ );
+ $request->content($content);
+
+ my $response = $self->send_request($request);
+
+ # restore time out value
+ if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) {
+ $self->useragent->timeout($ua_timeout);
+ }
+
+ unless ($response->is_success) {
+ die "MapReduce query failed: ".$response->status_line;
+ }
+
+ return JSON::decode_json($response->content);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm
new file mode 100644
index 0000000..d38315a
--- /dev/null
+++ b/lib/Net/Riak/Role/REST/Object.pm
@@ -0,0 +1,160 @@
+package Net::Riak::Role::REST::Object;
+
+use Moose::Role;
+use JSON;
+
+sub store_object {
+ my ($self, $w, $dw, $object) = @_;
+
+ my $params = {returnbody => 'true', w => $w, dw => $dw};
+
+ $params->{returnbody} = 'false'
+ if $self->disable_return_body;
+
+
+ my $request;
+ if ( defined $object->key ) {
+ $request = $self->new_request('PUT',
+ [$self->prefix, $object->bucket->name, $object->key], $params);
+ } else {
+ $request = $self->new_request('POST',
+ [$self->prefix, $object->bucket->name ], $params);
+ }
+
+ $request->header('X-Riak-ClientID' => $self->client_id);
+ $request->header('Content-Type' => $object->content_type);
+
+ if ($object->has_vclock) {
+ $request->header('X-Riak-Vclock' => $object->vclock);
+ }
+
+ if ($object->has_links) {
+ $request->header('link' => $self->_links_to_header($object));
+ }
+
+ if (ref $object->data && $object->content_type eq 'application/json') {
+ $request->content(JSON::encode_json($object->data));
+ }
+ else {
+ $request->content($object->data);
+ }
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [200, 201, 204, 300]);
+ return $object;
+}
+
+sub load_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $request =
+ $self->new_request( 'GET',
+ [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [ 200, 300, 404 ] );
+ $object;
+}
+
+sub delete_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $request =
+ $self->new_request( 'DELETE',
+ [ $self->prefix, $object->bucket->name, $object->key ], $params );
+
+ my $response = $self->send_request($request);
+ $self->populate_object($object, $response, [ 204, 404 ] );
+ $object;
+}
+
+sub populate_object {
+ my ($self, $obj, $http_response, $expected) = @_;
+
+ $obj->_clear_links;
+ $obj->exists(0);
+
+ return if (!$http_response);
+
+ my $status = $http_response->code;
+
+ $obj->data($http_response->content)
+ unless $self->disable_return_body;
+
+ if ( $http_response->header('location') ) {
+ $obj->key( $http_response->header('location') );
+ $obj->location( $http_response->header('location') );
+ }
+
+ if (!grep { $status == $_ } @$expected) {
+ confess "Expected status "
+ . (join(', ', @$expected))
+ . ", received $status"
+ }
+
+ if ($status == 404) {
+ $obj->clear;
+ return;
+ }
+
+ $obj->exists(1);
+
+ if ($http_response->header('link')) {
+ $self->_populate_links($obj, $http_response->header('link'));
+ }
+
+ if ($status == 300) {
+ my @siblings = split("\n", $obj->data);
+ shift @siblings;
+ my %seen; @siblings = grep { !$seen{$_}++ } @siblings;
+ $obj->siblings(\@siblings);
+ }
+
+ if ($status == 201) {
+ my $location = $http_response->header('location');
+ my ($key) = ($location =~ m!/([^/]+)$!);
+ $obj->key($key);
+ }
+
+
+ if ($status == 200 || $status == 201) {
+ $obj->content_type($http_response->content_type)
+ if $http_response->content_type;
+ $obj->data(JSON::decode_json($obj->data))
+ if $obj->content_type eq 'application/json';
+ $obj->vclock($http_response->header('X-Riak-Vclock'));
+ }
+}
+
+sub retrieve_sibling {
+ my ($self, $object, $params) = @_;
+
+ my $request = $self->new_request(
+ 'GET',
+ [$self->prefix, $object->bucket->name, $object->key],
+ $params
+ );
+
+ my $response = $self->send_request($request);
+
+ my $sibling = Net::Riak::Object->new(
+ client => $self,
+ bucket => $object->bucket,
+ key => $object->key
+ );
+
+ $sibling->_jsonize($object->_jsonize);
+ $self->populate_object($sibling, $response, [200]);
+ $sibling;
+}
+
+
+
+
+1;
+__END__
+
+=item populate_object
+
+Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library.
+
diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm
index eaec209..9dacf96 100644
--- a/lib/Net/Riak/Role/UserAgent.pm
+++ b/lib/Net/Riak/Role/UserAgent.pm
@@ -10,6 +10,12 @@ our $CONN_CACHE;
sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new }
+has ua_timeout => (
+ is => 'rw',
+ isa => 'Int',
+ default => 120
+);
+
has useragent => (
is => 'rw',
isa => 'LWP::UserAgent',
@@ -24,7 +30,8 @@ has useragent => (
@LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts;
my $ua = LWP::UserAgent->new(
- timeout => $self->ua_timeout
+ timeout => $self->ua_timeout,
+ keep_alive => 1,
);
$ua->conn_cache(__PACKAGE__->connection_cache);
diff --git a/lib/Net/Riak/Transport/PBC.pm b/lib/Net/Riak/Transport/PBC.pm
new file mode 100644
index 0000000..e495663
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC.pm
@@ -0,0 +1,9 @@
+package Net::Riak::Transport::PBC;
+
+use Moose::Role;
+
+with qw/
+ Net::Riak::Role::PBC
+ /;
+
+1;
diff --git a/lib/Net/Riak/Transport/PBC/Code.pm b/lib/Net/Riak/Transport/PBC/Code.pm
new file mode 100644
index 0000000..9231540
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Code.pm
@@ -0,0 +1,90 @@
+package Net::Riak::Transport::PBC::Code;
+use strict;
+use warnings;
+use base 'Exporter';
+
+our @EXPORT_OK = qw/
+ REQ_CODE
+ RESP_CLASS
+ EXPECTED_RESP
+ RESP_DECODER
+/;
+
+sub EXPECTED_RESP {
+ my $code = shift;
+ return {
+ 1 => 2,
+ 3 => 4,
+ 5 => 6,
+ 7 => 8,
+ 9 => 10,
+ 11 => 12,
+ 13 => 14,
+ 15 => 16,
+ 17 => 18,
+ 19 => 20,
+ 21 => 22,
+ 23 => 24,
+ }->{$code};
+}
+sub RESP_CLASS {
+ my $code = shift;
+
+ return {
+ 0 => 'RpbErrorResp',
+ 2 => 'RpbPingResp',
+ 4 => 'RpbGetClientIdResp',
+ 6 => 'RpbSetClientIdResp',
+ 8 => 'RpbGetServerInfoResp',
+ 10 => 'RpbGetResp',
+ 12 => 'RpbPutResp',
+ 14 => 'RpbDelResp',
+ 16 => 'RpbListBucketsResp',
+ 18 => 'RpbListKeysResp',
+ 20 => 'RpbGetBucketResp',
+ 22 => 'RpbSetBucketResp',
+ 24 => 'RpbMapRedResp',
+ }->{$code};
+}
+
+sub RESP_DECODER {
+ my $code = shift;
+
+ return {
+ 0 => 'RpbErrorResp',
+ 2 => undef,
+ 4 => 'RpbGetClientIdResp',
+ 6 => undef,
+ 8 => 'RpbGetServerInfoResp',
+ 10 => 'RpbGetResp',
+ 12 => 'RpbPutResp',
+ 14 => undef,
+ 16 => 'RpbListBucketsResp',
+ 18 => 'RpbListKeysResp',
+ 20 => 'RpbGetBucketResp',
+ 22 => undef,
+ 24 => 'RpbMapRedResp'
+ }->{$code};
+};
+
+
+sub REQ_CODE {
+ my $class = shift;
+
+ return {
+ RpbPingReq => 1,
+ RpbGetClientIdReq => 3,
+ RpbSetClientIdReq => 5,
+ RpbGetServerInfoReq => 7,
+ RpbGetReq => 9,
+ RpbPutReq => 11,
+ RpbDelReq => 13,
+ RpbListBucketsReq => 15,
+ RpbListKeysReq => 17,
+ RpbGetBucketReq => 19,
+ RpbSetBucketReq => 21,
+ RpbMapRedReq => 23,
+ }->{$class};
+}
+
+1;
diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm
new file mode 100644
index 0000000..75170de
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Message.pm
@@ -0,0 +1,121 @@
+package Net::Riak::Transport::PBC::Message;
+
+use Moose;
+use MooseX::Types::Moose qw/Str HashRef Int/;
+use Net::Riak::Types 'Socket';
+use Net::Riak::Transport::PBC::Code qw/
+ REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/;
+use Net::Riak::Transport::PBC::Transport;
+
+has socket => (
+ is => 'rw',
+ isa => Socket,
+ predicate => 'has_socket',
+);
+
+has request => (
+ isa => 'Str',
+ is => 'ro',
+ lazy_build => 1,
+);
+
+has request_code => (
+ required => 1,
+ isa => Int,
+ is => 'ro',
+ lazy_build => 1,
+);
+
+has message_type => (
+ required => 1,
+ isa => Str,
+ is => 'ro',
+ trigger => sub {
+ $_[0]->{message_type} = 'Rpb'.$_[1];
+ }
+);
+
+has params => (
+ is => 'ro',
+ isa => HashRef,
+);
+
+sub _build_request_code {
+ my $self = shift;
+ return REQ_CODE($self->message_type);
+}
+
+sub _build_request {
+ my $self = shift;
+ $self->_pack_request( $self->request_code, $self->encode );
+}
+
+sub _pack_request {
+ my ($self, $code, $req) = @_;
+ my $h = pack('c', $code) . $req;
+ use bytes;
+ my $len = length $h;
+ return pack('N',$len).$h;
+}
+
+sub encode {
+ my $self = shift;
+ return $self->message_type->can('encode')
+ ? $self->message_type->encode( $self->params )
+ : '';
+}
+
+sub decode {
+ my ($self, $type, $raw_content) = @_;
+ return 'Rpb'.$type->decode($raw_content);
+}
+
+sub send {
+ my ($self, $cb) = @_;
+
+ die "No socket? did you forget to ->connect?" unless $self->has_socket;
+
+ $self->socket->print($self->request);
+
+ my $resp = $self->handle_response;
+
+ return $resp unless $cb;
+
+ $cb->($resp);
+ while (!$resp->done) {
+ $resp = $self->handle_response;
+# use YAML::Syck; warn Dump $resp;
+ $cb->($resp);
+ }
+ return 1;
+}
+
+sub handle_response {
+ my $self = shift;
+ my ($code, $resp) = $self->_unpack_response;
+
+ my $expected_code = EXPECTED_RESP($self->request_code);
+
+ if ($expected_code != $code) {
+ # TODO throw object
+ die "Expecting response type "
+ . RESP_CLASS($expected_code)
+ . " got " . RESP_CLASS($code);
+ }
+
+ return 1 unless RESP_DECODER($code);
+ return RESP_DECODER($code)->decode($resp);
+}
+
+sub _unpack_response {
+ my $self = shift;
+ my ( $len, $code, $msg );
+ $self->socket->read( $len, 4 );
+ $len = unpack( 'N', $len );
+ $self->socket->read( $code, 1 );
+ $code = unpack( 'c', $code );
+ $self->socket->read( $msg, $len - 1 );
+ return ( $code, $msg );
+}
+
+1;
diff --git a/lib/Net/Riak/Transport/PBC/Transport.pm b/lib/Net/Riak/Transport/PBC/Transport.pm
new file mode 100644
index 0000000..768c32d
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Transport.pm
@@ -0,0 +1,483 @@
+package Net::Riak::Transport::PBC;
+
+##
+## This file was generated by Google::ProtocolBuffers (0.08)
+## on Mon Dec 13 11:30:34 2010
+##
+use strict;
+use warnings;
+use Google::ProtocolBuffers;
+{
+ unless (RpbSetClientIdReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbSetClientIdReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'client_id', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbSetBucketReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbSetBucketReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ 'RpbBucketProps',
+ 'props', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbPutReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbPutReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vclock', 3, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ 'RpbContent',
+ 'content', 4, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'w', 5, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'dw', 6, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'return_body', 7, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbListBucketsResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbListBucketsResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'buckets', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetBucketResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetBucketResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ 'RpbBucketProps',
+ 'props', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'r', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetBucketReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetBucketReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbLink->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbLink',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'tag', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ 'RpbContent',
+ 'content', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vclock', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbPair->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbPair',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'value', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbPutResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbPutResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ 'RpbContent',
+ 'content', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vclock', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbDelReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbDelReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'key', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'rw', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbMapRedReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbMapRedReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'request', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'content_type', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbMapRedResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbMapRedResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'phase', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'response', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'done', 3, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetClientIdResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetClientIdResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'client_id', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbErrorResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbErrorResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'errmsg', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'errcode', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbBucketProps->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbBucketProps',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'n_val', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'allow_mult', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbGetServerInfoResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbGetServerInfoResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'node', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'server_version', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbListKeysReq->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbListKeysReq',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'bucket', 1, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbListKeysResp->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbListKeysResp',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'keys', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BOOL(),
+ 'done', 2, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+ unless (RpbContent->can('_pb_fields_list')) {
+ Google::ProtocolBuffers->create_message(
+ 'RpbContent',
+ [
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'value', 1, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'content_type', 2, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'charset', 3, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'content_encoding', 4, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_BYTES(),
+ 'vtag', 5, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ 'RpbLink',
+ 'links', 6, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'last_mod', 7, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
+ Google::ProtocolBuffers::Constants::TYPE_UINT32(),
+ 'last_mod_usecs', 8, undef
+ ],
+ [
+ Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
+ 'RpbPair',
+ 'usermeta', 9, undef
+ ],
+
+ ],
+ { 'create_accessors' => 1, }
+ );
+ }
+
+}
+
+1;
diff --git a/lib/Net/Riak/Transport/REST.pm b/lib/Net/Riak/Transport/REST.pm
new file mode 100644
index 0000000..434f4be
--- /dev/null
+++ b/lib/Net/Riak/Transport/REST.pm
@@ -0,0 +1,11 @@
+package Net::Riak::Transport::REST;
+
+use Moose::Role;
+
+with qw/
+ Net::Riak::Role::UserAgent
+ Net::Riak::Role::REST
+ Net::Riak::Role::Hosts
+ /;
+
+1;
diff --git a/lib/Net/Riak/Types.pm b/lib/Net/Riak/Types.pm
new file mode 100644
index 0000000..a1e28b4
--- /dev/null
+++ b/lib/Net/Riak/Types.pm
@@ -0,0 +1,38 @@
+package Net::Riak::Types;
+
+use MooseX::Types::Moose qw/Str ArrayRef HashRef/;
+use MooseX::Types::Structured qw(Tuple Optional Dict);
+use MooseX::Types -declare =>
+ [qw(Socket Client HTTPResponse HTTPRequest RiakHost)];
+
+class_type Socket, { class => 'IO::Socket::INET' };
+class_type Client, { class => 'Net::Riak::Client' };
+class_type HTTPRequest, { class => 'HTTP::Request' };
+class_type HTTPResponse, { class => 'HTTP::Response' };
+
+subtype RiakHost, as ArrayRef [HashRef];
+
+coerce RiakHost, from Str, via {
+ [ { node => $_, weight => 1 } ];
+};
+
+coerce RiakHost, from ArrayRef, via {
+ warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release.";
+ my $backends = $_;
+ my $weight = 1 / @$backends;
+ [ map { { node => $_, weight => $weight } } @$backends ];
+};
+
+coerce RiakHost, from HashRef, via {
+ warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release.";
+ my $backends = $_;
+ my $total = 0;
+ $total += $_ for values %$backends;
+ [
+ map { { node => $_, weight => $backends->{$_} / $total } }
+ keys %$backends
+ ];
+};
+
+1;
+