diff options
| author | Robin Edwards <robin.ge@gmail.com> | 2011-04-20 14:38:43 +0100 |
|---|---|---|
| committer | Robin Edwards <robin.ge@gmail.com> | 2011-04-20 14:38:43 +0100 |
| commit | 79bea382fd2c0753ca9ace79a11bb74c9a1d722b (patch) | |
| tree | bde42a47792a27e0a863ee527b88c8c24258f7e9 /lib/Net/Riak | |
| parent | Merge remote branch 'simon/fix_link_encoding' (diff) | |
| download | net-riak-79bea382fd2c0753ca9ace79a11bb74c9a1d722b.tar.gz | |
merged pbc branch to master
Diffstat (limited to 'lib/Net/Riak')
24 files changed, 1547 insertions, 323 deletions
diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm index 2bc334e..19f5d94 100644 --- a/lib/Net/Riak/Bucket.pm +++ b/lib/Net/Riak/Bucket.pm @@ -1,16 +1,14 @@ package Net::Riak::Bucket; - -# ABSTRACT: Access and change information about a Riak bucket - -use JSON; use Moose; -use Carp; use Net::Riak::Object; - +use Net::Riak::Types Client => {-as => 'Client_T'}; with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]}; -with 'Net::Riak::Role::Base' => { - classes => [{ name => 'client', required => 1, }] -}; + +has client => ( + is => 'rw', + isa => Client_T, + required => 1, +); has name => ( is => 'ro', @@ -37,20 +35,17 @@ sub allow_multiples { my $self = shift; if (my $val = shift) { - my $bool = ($val == 1 ? JSON::true : JSON::false); + my $bool = ($val == 1 ? 1 : 0); $self->set_property('allow_mult', $bool); } else { - return $self->get_property('allow_mult'); + return $self->get_property('allow_mult') ? 1 : 0; } } sub get_keys { my ($self, $params) = @_; - my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; - $params = { props => 'false', keys => $key_mode, %$params }; - my $properties = $self->get_properties($params); - return $properties->{keys}; + $self->client->get_keys($self->name, $params); } sub get { @@ -66,12 +61,12 @@ sub get { } sub delete_object { - my ($self, $key) = @_; + my ($self, $key, $dw) = @_; Net::Riak::Object->new( client => $self->client, bucket => $self, key => $key - )->delete; + )->delete($dw); } sub set_property { @@ -87,59 +82,12 @@ sub get_property { sub get_properties { my ($self, $params) = @_; - - # Callbacks require stream mode - $params->{keys} = 'stream' if $params->{cb}; - - $params->{props} = 'true' unless exists $params->{props}; - $params->{keys} = 'false' unless exists $params->{keys}; - - my $request = $self->client->new_request( - 'GET', [$self->client->prefix, $self->name], $params - ); - - my $response = $self->client->send_request($request); - - unless ($response->is_success) { - die "Error getting bucket properties: ".$response->status_line."\n"; - } - - if ($params->{keys} ne 'stream') { - return JSON::decode_json($response->content); - } - - # In streaming mode, aggregate keys from the multiple returned chunk objects - else { - my $json = JSON->new; - my $props = $json->incr_parse($response->content); - if ($params->{cb}) { - while (defined(my $obj = $json->incr_parse)) { - $params->{cb}->($_) foreach @{$obj->{keys}}; - } - return %$props ? { props => $props } : {}; - } - else { - my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } - $json->incr_parse; - return { props => $props, keys => \@keys }; - } - } + $self->client->get_properties($self->name, $params); } sub set_properties { my ($self, $props) = @_; - - my $request = $self->client->new_request( - 'PUT', [$self->client->prefix, $self->name] - ); - - $request->header('Content-Type' => $self->content_type); - $request->content(JSON::encode_json({props => $props})); - - my $response = $self->client->send_request($request); - unless ($response->is_success) { - die "Error setting bucket properties: ".$response->status_line."\n"; - } + $self->client->set_properties($self, $props); } sub new_object { @@ -169,7 +117,7 @@ sub new_object { my $obj2 = $bucket->new_object('foo2', {...}); $object->store; - $bucket->delete_object($key); + $bucket->delete_object($key, 3); # optional w val =head1 DESCRIPTION diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm index 4d14338..f38bec6 100644 --- a/lib/Net/Riak/Client.pm +++ b/lib/Net/Riak/Client.pm @@ -2,10 +2,8 @@ package Net::Riak::Client; use Moose; use MIME::Base64; -use Moose::Util::TypeConstraints; -class_type 'HTTP::Request'; -class_type 'HTTP::Response'; +with 'MooseX::Traits'; has prefix => ( is => 'rw', @@ -27,40 +25,10 @@ has client_id => ( isa => 'Str', lazy_build => 1, ); -has http_request => ( - is => 'rw', - isa => 'HTTP::Request', -); - -has http_response => ( - is => 'rw', - isa => 'HTTP::Response', - handles => ['is_success'] -); - -has ua_timeout => ( - is => 'rw', - isa => 'Int', - default => 3 -); - -with 'Net::Riak::Role::UserAgent'; -with qw/ - Net::Riak::Role::REST - Net::Riak::Role::Hosts - /; - - sub _build_client_id { "perl_net_riak" . encode_base64(int(rand(10737411824)), ''); } -sub is_alive { - my $self = shift; - my $request = $self->new_request('GET', ['ping']); - my $response = $self->send_request($request); - $self->is_success ? return 1 : return 0; -} 1; diff --git a/lib/Net/Riak/Link.pm b/lib/Net/Riak/Link.pm index 980aabb..57881a0 100644 --- a/lib/Net/Riak/Link.pm +++ b/lib/Net/Riak/Link.pm @@ -20,19 +20,4 @@ has tag => ( default => sub {(shift)->bucket->name} ); -sub to_link_header { - my ($self, $client) = @_; - - $client ||= $self->client; - - my $link = ''; - $link .= '</'; - $link .= $client->prefix . '/'; - $link .= $self->bucket->name . '/'; - $link .= $self->key . '>; riaktag="'; - $link .= $self->tag . '"'; - return $link; -} - 1; - diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm index d05c30a..10a7c98 100644 --- a/lib/Net/Riak/MapReduce.pm +++ b/lib/Net/Riak/MapReduce.pm @@ -6,6 +6,8 @@ use JSON; use Moose; use Scalar::Util; +use Data::Dumper; + use Net::Riak::LinkPhase; use Net::Riak::MapReducePhase; @@ -156,35 +158,12 @@ sub run { $inputs = $self->inputs; } - my $ua_timeout = $self->client->useragent->timeout(); - my $job = {inputs => $inputs, query => $query}; - if ($timeout) { - if ($ua_timeout < ($timeout/1000)) { - $self->client->useragent->timeout(int($timeout/1000)); - } - $job->{timeout} = $timeout; - } - - - my $content = JSON::encode_json($job); - my $request = $self->client->new_request( - 'POST', [$self->client->mapred_prefix] - ); - $request->content($content); - - my $response = $self->client->send_request($request); - - unless ($response->is_success) { - die "MapReduce query failed: ".$response->status_line; - } - - my $result = JSON::decode_json($response->content); + # how phases set to 'keep'. + my $p = scalar ( grep { $_->keep } $self->phases); - if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) { - $self->client->useragent->timeout($ua_timeout); - } + my $result = $self->client->execute_job($job, $timeout, $p); my @phases = $self->phases; if (ref $phases[-1] ne 'Net::Riak::LinkPhase') { diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm index f40031b..7148d4f 100644 --- a/lib/Net/Riak/Object.pm +++ b/lib/Net/Riak/Object.pm @@ -2,24 +2,27 @@ package Net::Riak::Object; # ABSTRACT: holds meta information about a Riak object -use Carp; -use JSON; use Moose; use Scalar::Util; use Net::Riak::Link; with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]}; with 'Net::Riak::Role::Base' => {classes => - [{name => 'bucket', required => 1}, {name => 'client', required => 1}]}; - + [{name => 'bucket', required => 1}]}; +use Net::Riak::Types Client => {-as => 'Client_T'}; +has client => ( + is => 'rw', + isa => Client_T, + required => 1, +); has key => (is => 'rw', isa => 'Str', required => 0); -has status => (is => 'rw', isa => 'Int'); has exists => (is => 'rw', isa => 'Bool', default => 0,); has data => (is => 'rw', isa => 'Any', clearer => '_clear_data'); -has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock',); +has vclock => (is => 'rw', isa => 'Str', predicate => 'has_vclock'); +has vtag => (is => 'rw', isa => 'Str'); has content_type => (is => 'rw', isa => 'Str', default => 'application/json'); -has _headers => (is => 'rw', isa => 'HTTP::Response',); -has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1,); +has location => ( is => 'rw', isa => 'Str' ); +has _jsonize => (is => 'rw', isa => 'Bool', lazy => 1, default => 1); has links => ( traits => ['Array'], is => 'rw', @@ -31,6 +34,7 @@ has links => ( count_links => 'elements', append_link => 'push', has_links => 'count', + all_links => 'elements', }, clearer => '_clear_links', ); @@ -52,62 +56,31 @@ has siblings => ( clearer => '_clear_siblings', ); +after count_links => sub { + warn "DEPRECATED: count_links method will be removed in the 0.17 release, please use has_links."; +}; + sub store { my ($self, $w, $dw) = @_; $w ||= $self->w; $dw ||= $self->dw; - my $params = {returnbody => 'true', w => $w, dw => $dw}; - my $path = [$self->client->prefix, $self->bucket->name]; - my $method = 'POST'; - if (defined $self->key) { - push @$path, $self->key; - $method = 'PUT'; - } - - my $request = $self->client->new_request($method, $path, $params); - - $request->header('X-Riak-ClientID' => $self->client->client_id); - $request->header('Content-Type' => $self->content_type); - - if ($self->has_vclock) { - $request->header('X-Riak-Vclock' => $self->vclock); - } - - if ($self->has_links) { - $request->header('link' => $self->_links_to_header); - } - - if (ref $self->data && $self->content_type eq 'application/json') { - $request->content(JSON::encode_json($self->data)); - } - else { - $request->content($self->data); - } - - my $response = $self->client->send_request($request); - $self->populate($response, [200, 201, 204, 300]); - $self; + $self->client->store_object($w, $dw, $self); } -sub _links_to_header { - my $self = shift; - join(', ', map { $_->to_link_header($self->client) } $self->links); -} +sub status { + my ($self) = @_; + warn "DEPRECATED: status method will be removed in the 0.17 release, please use ->client->status."; + $self->client->status; +} sub load { my $self = shift; my $params = {r => $self->r}; - my $request = - $self->client->new_request('GET', - [$self->client->prefix, $self->bucket->name, $self->key], $params); - - my $response = $self->client->send_request($request); - $self->populate($response, [200, 300, 404]); - $self; + $self->client->load_object($params, $self); } sub delete { @@ -116,13 +89,7 @@ sub delete { $dw ||= $self->bucket->dw; my $params = {dw => $dw}; - my $request = - $self->client->new_request('DELETE', - [$self->client->prefix, $self->bucket->name, $self->key], $params); - - my $response = $self->client->send_request($request); - $self->populate($response, [204, 404]); - $self; + $self->client->delete_object($params, $self); } sub clear { @@ -133,109 +100,17 @@ sub clear { $self; } -sub populate { - my ($self, $http_response, $expected) = @_; - - $self->clear; - - return if (!$http_response); - - my $status = $http_response->code; - $self->_headers($http_response); - $self->status($status); - - $self->data($http_response->content); - - if (!grep { $status == $_ } @$expected) { - confess "Expected status " - . (join(', ', @$expected)) - . ", received $status" - } - - if ($status == 404) { - $self->clear; - return; - } - - $self->exists(1); - - if ($http_response->header('link')) { - $self->_populate_links($http_response->header('link')); - } - - if ($status == 300) { - my @siblings = split("\n", $self->data); - shift @siblings; - $self->siblings(\@siblings); - } - - if ($status == 201) { - my $location = $http_response->header('location'); - my ($key) = ($location =~ m!/([^/]+)$!); - $self->key($key); - } - - - if ($status == 200 || $status == 201) { - $self->content_type($http_response->content_type) - if $http_response->content_type; - $self->data(JSON::decode_json($self->data)) - if $self->content_type eq 'application/json'; - $self->vclock($http_response->header('X-Riak-Vclock')); - } -} - -sub _uri_decode { - my $str = shift; - $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg; - return $str; -} - -sub _populate_links { - my ($self, $links) = @_; - for my $link (split(',', $links)) { - if ($link - =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/) - { - my $bucket = _uri_decode($2); - my $key = _uri_decode($3); - my $tag = _uri_decode($4); - my $l = Net::Riak::Link->new( - bucket => Net::Riak::Bucket->new( - name => $bucket, - client => $self->client - ), - key => $key, - tag => $tag - ); - $self->add_link($l); - } - } -} - sub sibling { my ($self, $id, $r) = @_; $r ||= $self->bucket->r; my $vtag = $self->get_sibling($id); - my $params = {r => $r, vtag => $vtag}; - my $request = - $self->client->new_request('GET', - [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->send_request($request); - - my $obj = Net::Riak::Object->new( - client => $self->client, - bucket => $self->bucket, - key => $self->key + return $self->client->retrieve_sibling( + $self, {r => $r, vtag => $vtag} ); - $obj->_jsonize($self->_jsonize); - $obj->populate($response, [200]); - $obj; } - sub _build_link { my ($self,$obj,$tag) = @_; blessed $obj && $obj->isa('Net::Riak::Link') @@ -337,10 +212,6 @@ Get or set the data stored in this object. =item B<content_type> -=item B<status> - -Get the HTTP status from the last operation on this object. - =item B<links> Get an array of L<Net::Riak::Link> objects @@ -359,7 +230,11 @@ Return an array of Siblings =over 4 -=item count_links +=item all_links + +Return the number of links + +=item has_links Return the number of links @@ -445,7 +320,7 @@ Return true if this object has siblings Return true if this object has no siblings -=item populate +=item populate_object Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. diff --git a/lib/Net/Riak/Role/Hosts.pm b/lib/Net/Riak/Role/Hosts.pm index 34d9273..639d472 100644 --- a/lib/Net/Riak/Role/Hosts.pm +++ b/lib/Net/Riak/Role/Hosts.pm @@ -1,29 +1,11 @@ package Net::Riak::Role::Hosts; use Moose::Role; -use Moose::Util::TypeConstraints; - -subtype 'RiakHost' => as 'ArrayRef[HashRef]'; - -coerce 'RiakHost' => from 'Str' => via { - [{node => $_, weight => 1}]; -}; -coerce 'RiakHost' => from 'ArrayRef' => via { - my $backends = $_; - my $weight = 1 / @$backends; - [map { {node => $_, weight => $weight} } @$backends]; -}; -coerce 'RiakHost' => from 'HashRef' => via { - my $backends = $_; - my $total = 0; - $total += $_ for values %$backends; - [map { {node => $_, weight => $backends->{$_} / $total} } - keys %$backends]; -}; +use Net::Riak::Types qw(RiakHost); has host => ( is => 'rw', - isa => 'RiakHost', + isa => RiakHost, coerce => 1, default => 'http://127.0.0.1:8098', ); diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm new file mode 100644 index 0000000..605f032 --- /dev/null +++ b/lib/Net/Riak/Role/PBC.pm @@ -0,0 +1,78 @@ +package Net::Riak::Role::PBC; + +use Moose::Role; +use MooseX::Types::Moose qw/Str Int/; + +with qw( + Net::Riak::Role::PBC::Message + Net::Riak::Role::PBC::Bucket + Net::Riak::Role::PBC::MapReduce + Net::Riak::Role::PBC::Link + Net::Riak::Role::PBC::Object); + +use Net::Riak::Types 'Socket'; +use IO::Socket::INET; + +has [qw/r w dw/] => ( + is => 'rw', + isa => Int, + default => 2 +); + +has host => ( + is => 'ro', + isa => Str, + required => 1, +); + +has port => ( + is => 'ro', + isa => Int, + required => 1, +); + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +sub is_alive { + my $self = shift; + return $self->send_message('PingReq'); +} + +sub connected { + my $self = shift; + return $self->has_socket && $self->socket->connected ? 1 : 0; +} + +sub connect { + my $self = shift; + return if $self->has_socket && $self->connected; + + $self->socket( + IO::Socket::INET->new( + PeerAddr => $self->host, + PeerPort => $self->port, + Proto => 'tcp', + Timeout => 30, + ) + ); +} + +sub all_buckets { + my $self = shift; + my $resp = $self->send_message('ListBucketsReq'); + return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : (); +} + +sub server_info { + my $self = shift; + my $resp = $self->send_message('GetServerInfoReq'); + return $resp; +} + +sub stats { die "->stats is only avaliable through the REST interface" } + +1; diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm new file mode 100644 index 0000000..aa7d7fa --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Bucket.pm @@ -0,0 +1,46 @@ +package Net::Riak::Role::PBC::Bucket; + +use Moose::Role; +use Data::Dumper; + +sub get_properties { + my ( $self, $name, $params ) = @_; + my $resp = $self->send_message( GetBucketReq => { bucket => $name } ); + return { props => { %{ $resp->props } } }; +} + +sub set_properties { + my ( $self, $bucket, $props ) = @_; + return $self->send_message( + SetBucketReq => { + bucket => $bucket->name, + props => $props + } + ); +} + +sub get_keys { + my ( $self, $name, $params) = @_; + my $keys = []; + + my $res = $self->send_message( + ListKeysReq => { bucket => $name, }, + sub { + if ( defined $_[0]->keys ) { + if ($params->{cb}) { + $params->{cb}->($_) for @{ $_[0]->keys }; + } + else { + push @$keys, @{ $_[0]->keys }; + } + } + } + ); + + return $params->{cb} ? undef : $keys; +} + + + +1; + diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm new file mode 100644 index 0000000..5e6a336 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Link.pm @@ -0,0 +1,35 @@ +package Net::Riak::Role::PBC::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (@$links) { + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $link->bucket, + client => $self + ), + key => $link->key, + tag => $link->tag + ); + $object->add_link($l); + } +} + +sub _links_for_message { + my ($self, $object) = @_; + + return [ + map { { + tag => $_->tag, + key => $_->key, + bucket => $_->bucket->name + } + } $object->all_links + ] +} + +1; diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm new file mode 100644 index 0000000..afeabe8 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/MapReduce.pm @@ -0,0 +1,37 @@ +package Net::Riak::Role::PBC::MapReduce; +use Moose::Role; +use JSON; +use List::Util 'sum'; +use Data::Dump 'pp'; + +sub execute_job { + my ($self, $job, $timeout, $returned_phases) = @_; + + $job->{timeout} = $timeout; + + my $job_request = JSON::encode_json($job); + + my $results; + + my $resp = $self->send_message( MapRedReq => { + request => $job_request, + content_type => 'application/json' + }, sub { push @$results, $self->decode_phase(shift) }) + or + die "MapReduce query failed!"; + + + return $returned_phases == 1 ? $results->[0] : $results; +} + +sub decode_phase { + my ($self, $resp) = @_; + + if (defined $resp->response && length($resp->response)) { + return JSON::decode_json($resp->response); + } + + return; +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm new file mode 100644 index 0000000..0c2fbf3 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Message.pm @@ -0,0 +1,21 @@ +package Net::Riak::Role::PBC::Message; + +use Moose::Role; +use Net::Riak::Transport::PBC::Message; + +sub send_message { + my ( $self, $type, $params, $cb ) = @_; + + $self->connect unless $self->connected; + + my $message = Net::Riak::Transport::PBC::Message->new( + message_type => $type, + params => $params || {}, + ); + + $message->socket( $self->socket ); + + return $message->send($cb); +} + +1; diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm new file mode 100644 index 0000000..847cac2 --- /dev/null +++ b/lib/Net/Riak/Role/PBC/Object.pm @@ -0,0 +1,131 @@ +package Net::Riak::Role::PBC::Object; + +use JSON; +use Moose::Role; +use Data::Dumper; +use List::Util 'first'; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $value = (ref $object->data && $object->content_type eq 'application/json') + ? JSON::encode_json($object->data) : $object->data; + + my $content = { + content_type => $object->content_type, + value => $value, + usermeta => undef + }; + + if ($object->has_links) { + $content->{links} = $self->_links_for_message($object); + } + + $self->send_message( + PutReq => { + bucket => $object->bucket->name, + key => $object->key, + content => $content, + } + ); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + $self->populate_object($object, $resp); + + return $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $resp = $self->send_message( + DelReq => { + bucket => $object->bucket->name, + key => $object->key, + rw => $params->{dw}, + } + ); + + $object; +} + +sub populate_object { + my ( $self, $object, $resp) = @_; + + $object->_clear_links; + $object->exists(0); + + if ( $resp->content && scalar (@{$resp->content}) > 1) { + my %seen; + my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content}; + $object->siblings(\@vtags); + } + + my $content = $resp->content ? $resp->content->[0] : undef; + + return unless $content and $resp->vclock; + + $object->vclock($resp->vclock); + $object->vtag($content->vtag); + $object->content_type($content->content_type); + + if($content->links) { + $self->_populate_links($object, $content->links); + } + + my $data = ($object->content_type eq 'application/json') + ? JSON::decode_json($content->value) : $content->value; + + $object->exists(1); + + $object->data($data); +} + + +# This emulates the behavior of the existing REST client. +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $resp = $self->send_message( + GetReq => { + bucket => $object->bucket->name, + key => $object->key, + r => $params->{r}, + } + ); + + # hack for loading 1 sibling + if ($params->{vtag}) { + $resp->{content} = [ + first { + $_->vtag eq $params->{vtag} + } @{$resp->content} + ]; + } + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + + $self->populate_object($sibling, $resp); + + $sibling; +} + +1; diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm index 136ea88..dfab5a0 100644 --- a/lib/Net/Riak/Role/REST.pm +++ b/lib/Net/Riak/Role/REST.pm @@ -3,12 +3,36 @@ package Net::Riak::Role::REST; # ABSTRACT: role for REST operations use URI; -use HTTP::Request; + use Moose::Role; +use MooseX::Types::Moose 'Bool'; +use Net::Riak::Types qw/HTTPResponse HTTPRequest/; +use Data::Dump 'pp'; +with qw/Net::Riak::Role::REST::Bucket + Net::Riak::Role::REST::Object + Net::Riak::Role::REST::Link + Net::Riak::Role::REST::MapReduce + /; + +has http_request => ( + is => 'rw', + isa => HTTPRequest, +); + +has http_response => ( + is => 'rw', + isa => HTTPResponse, + handles => { + is_success => 'is_success', + status => 'code', + } +); -requires 'http_request'; -requires 'http_response'; -requires 'useragent'; +has disable_return_body => ( + is => 'rw', + isa => Bool, + default => 0 +); sub _build_path { my ($self, $path) = @_; @@ -37,9 +61,40 @@ sub send_request { $self->http_request($req); my $r = $self->useragent->request($req); + $self->http_response($r); + if ($ENV{RIAK_VERBOSE}) { + print STDERR pp($r); + } + return $r; } +sub is_alive { + my $self = shift; + my $request = $self->new_request('HEAD', ['ping']); + my $response = $self->send_request($request); + $self->is_success ? return 1 : return 0; +} + +sub all_buckets { + my $self = shift; + my $request = $self->new_request('GET', [$self->prefix], {buckets => 'true'}); + my $response = $self->send_request($request); + die "Failed to fetch buckets.. are you running riak 0.14+?" + unless $response->is_success; + my $resp = JSON::decode_json($response->content); + return ref ($resp->{buckets}) eq 'ARRAY' ? @{$resp->{buckets}} : (); +} + +sub server_info { die "->server_info not supported by the REST interface" } + +sub stats { + my $self = shift; + my $request = $self->new_request('GET', ["stats"]); + my $response = $self->send_request($request); + return JSON::decode_json($response->content); +} + 1; diff --git a/lib/Net/Riak/Role/REST/Bucket.pm b/lib/Net/Riak/Role/REST/Bucket.pm new file mode 100644 index 0000000..8a037c0 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Bucket.pm @@ -0,0 +1,73 @@ +package Net::Riak::Role::REST::Bucket; + +use Moose::Role; +use JSON; + +sub get_properties { + my ($self, $name, $params) = @_; + + # Callbacks require stream mode + $params->{keys} = 'stream' if $params->{cb}; + + $params->{props} = 'true' unless exists $params->{props}; + $params->{keys} = 'false' unless exists $params->{keys}; + + my $request = $self->new_request( + 'GET', [$self->prefix, $name], $params + ); + + my $response = $self->send_request($request); + + unless ($response->is_success) { + die "Error getting bucket properties: ".$response->status_line."\n"; + } + + if ($params->{keys} ne 'stream') { + return JSON::decode_json($response->content); + } + + # In streaming mode, aggregate keys from the multiple returned chunk objects + else { + my $json = JSON->new; + my $props = $json->incr_parse($response->content); + if ($params->{cb}) { + while (defined(my $obj = $json->incr_parse)) { + $params->{cb}->($_) foreach @{$obj->{keys}}; + } + return %$props ? { props => $props } : {}; + } + else { + my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } + $json->incr_parse; + return { props => $props, keys => \@keys }; + } + } +} + +sub set_properties { + my ($self, $bucket, $props) = @_; + + my $request = $self->new_request( + 'PUT', [$self->prefix, $bucket->name] + ); + + $request->header('Content-Type' => $bucket->content_type); + $request->content(JSON::encode_json({props => $props})); + + my $response = $self->send_request($request); + unless ($response->is_success) { + die "Error setting bucket properties: ".$response->status_line."\n"; + } +} + +sub get_keys { + my ($self, $bucket, $params) = @_; + + my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; + $params = { props => 'false', keys => $key_mode, %$params }; + my $properties = $self->get_properties($bucket, $params); + + return $properties->{keys}; +} + +1; diff --git a/lib/Net/Riak/Role/REST/Link.pm b/lib/Net/Riak/Role/REST/Link.pm new file mode 100644 index 0000000..fbead86 --- /dev/null +++ b/lib/Net/Riak/Role/REST/Link.pm @@ -0,0 +1,52 @@ +package Net::Riak::Role::REST::Link; +use Moose::Role; +use Net::Riak::Link; +use Net::Riak::Bucket; + +sub _populate_links { + my ($self, $object, $links) = @_; + + for my $link (split(',', $links)) { + if ($link + =~ /\<\/([^\/]+)\/([^\/]+)\/([^\/]+)\>; ?riaktag=\"([^\']+)\"/) + { + my $bucket = _uri_decode($2); + my $key = _uri_decode($3); + my $tag = _uri_decode($4); + my $l = Net::Riak::Link->new( + bucket => Net::Riak::Bucket->new( + name => $bucket, + client => $self + ), + key => $key, + tag => $tag + ); + $object->add_link($l); + } + } +} + +sub _uri_decode { + my $str = shift; + $str =~ s/%([a-fA-F0-9]{2,2})/chr(hex($1))/eg; + return $str; +} + +sub _links_to_header { + my ($self, $object) = @_; + join(', ', map { $self->link_to_header($_) } $object->links); +} + +sub link_to_header { + my ($self, $link) = @_; + + my $link_header = ''; + $link_header .= '</'; + $link_header .= $self->prefix . '/'; + $link_header .= $link->bucket->name . '/'; + $link_header .= $link->key . '>; riaktag="'; + $link_header .= $link->tag . '"'; + return $link_header; +} + +1; diff --git a/lib/Net/Riak/Role/REST/MapReduce.pm b/lib/Net/Riak/Role/REST/MapReduce.pm new file mode 100644 index 0000000..e987a21 --- /dev/null +++ b/lib/Net/Riak/Role/REST/MapReduce.pm @@ -0,0 +1,40 @@ +package Net::Riak::Role::REST::MapReduce; +use Moose::Role; +use JSON; +use Data::Dumper; + +sub execute_job { + my ($self, $job, $timeout) = @_; + + # save existing timeout value. + my $ua_timeout = $self->useragent->timeout(); + + if ($timeout) { + if ($ua_timeout < ($timeout/1000)) { + $self->useragent->timeout(int($timeout/1000)); + } + $job->{timeout} = $timeout; + } + + my $content = JSON::encode_json($job); + + my $request = $self->new_request( + 'POST', [$self->mapred_prefix] + ); + $request->content($content); + + my $response = $self->send_request($request); + + # restore time out value + if ( $timeout && ( $ua_timeout != $self->useragent->timeout() ) ) { + $self->useragent->timeout($ua_timeout); + } + + unless ($response->is_success) { + die "MapReduce query failed: ".$response->status_line; + } + + return JSON::decode_json($response->content); +} + +1; diff --git a/lib/Net/Riak/Role/REST/Object.pm b/lib/Net/Riak/Role/REST/Object.pm new file mode 100644 index 0000000..d38315a --- /dev/null +++ b/lib/Net/Riak/Role/REST/Object.pm @@ -0,0 +1,160 @@ +package Net::Riak::Role::REST::Object; + +use Moose::Role; +use JSON; + +sub store_object { + my ($self, $w, $dw, $object) = @_; + + my $params = {returnbody => 'true', w => $w, dw => $dw}; + + $params->{returnbody} = 'false' + if $self->disable_return_body; + + + my $request; + if ( defined $object->key ) { + $request = $self->new_request('PUT', + [$self->prefix, $object->bucket->name, $object->key], $params); + } else { + $request = $self->new_request('POST', + [$self->prefix, $object->bucket->name ], $params); + } + + $request->header('X-Riak-ClientID' => $self->client_id); + $request->header('Content-Type' => $object->content_type); + + if ($object->has_vclock) { + $request->header('X-Riak-Vclock' => $object->vclock); + } + + if ($object->has_links) { + $request->header('link' => $self->_links_to_header($object)); + } + + if (ref $object->data && $object->content_type eq 'application/json') { + $request->content(JSON::encode_json($object->data)); + } + else { + $request->content($object->data); + } + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [200, 201, 204, 300]); + return $object; +} + +sub load_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'GET', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 200, 300, 404 ] ); + $object; +} + +sub delete_object { + my ( $self, $params, $object ) = @_; + + my $request = + $self->new_request( 'DELETE', + [ $self->prefix, $object->bucket->name, $object->key ], $params ); + + my $response = $self->send_request($request); + $self->populate_object($object, $response, [ 204, 404 ] ); + $object; +} + +sub populate_object { + my ($self, $obj, $http_response, $expected) = @_; + + $obj->_clear_links; + $obj->exists(0); + + return if (!$http_response); + + my $status = $http_response->code; + + $obj->data($http_response->content) + unless $self->disable_return_body; + + if ( $http_response->header('location') ) { + $obj->key( $http_response->header('location') ); + $obj->location( $http_response->header('location') ); + } + + if (!grep { $status == $_ } @$expected) { + confess "Expected status " + . (join(', ', @$expected)) + . ", received $status" + } + + if ($status == 404) { + $obj->clear; + return; + } + + $obj->exists(1); + + if ($http_response->header('link')) { + $self->_populate_links($obj, $http_response->header('link')); + } + + if ($status == 300) { + my @siblings = split("\n", $obj->data); + shift @siblings; + my %seen; @siblings = grep { !$seen{$_}++ } @siblings; + $obj->siblings(\@siblings); + } + + if ($status == 201) { + my $location = $http_response->header('location'); + my ($key) = ($location =~ m!/([^/]+)$!); + $obj->key($key); + } + + + if ($status == 200 || $status == 201) { + $obj->content_type($http_response->content_type) + if $http_response->content_type; + $obj->data(JSON::decode_json($obj->data)) + if $obj->content_type eq 'application/json'; + $obj->vclock($http_response->header('X-Riak-Vclock')); + } +} + +sub retrieve_sibling { + my ($self, $object, $params) = @_; + + my $request = $self->new_request( + 'GET', + [$self->prefix, $object->bucket->name, $object->key], + $params + ); + + my $response = $self->send_request($request); + + my $sibling = Net::Riak::Object->new( + client => $self, + bucket => $object->bucket, + key => $object->key + ); + + $sibling->_jsonize($object->_jsonize); + $self->populate_object($sibling, $response, [200]); + $sibling; +} + + + + +1; +__END__ + +=item populate_object + +Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. + diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm index eaec209..9dacf96 100644 --- a/lib/Net/Riak/Role/UserAgent.pm +++ b/lib/Net/Riak/Role/UserAgent.pm @@ -10,6 +10,12 @@ our $CONN_CACHE; sub connection_cache { $CONN_CACHE ||= LWP::ConnCache->new } +has ua_timeout => ( + is => 'rw', + isa => 'Int', + default => 120 +); + has useragent => ( is => 'rw', isa => 'LWP::UserAgent', @@ -24,7 +30,8 @@ has useragent => ( @LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts; my $ua = LWP::UserAgent->new( - timeout => $self->ua_timeout + timeout => $self->ua_timeout, + keep_alive => 1, ); $ua->conn_cache(__PACKAGE__->connection_cache); diff --git a/lib/Net/Riak/Transport/PBC.pm b/lib/Net/Riak/Transport/PBC.pm new file mode 100644 index 0000000..e495663 --- /dev/null +++ b/lib/Net/Riak/Transport/PBC.pm @@ -0,0 +1,9 @@ +package Net::Riak::Transport::PBC; + +use Moose::Role; + +with qw/ + Net::Riak::Role::PBC + /; + +1; diff --git a/lib/Net/Riak/Transport/PBC/Code.pm b/lib/Net/Riak/Transport/PBC/Code.pm new file mode 100644 index 0000000..9231540 --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Code.pm @@ -0,0 +1,90 @@ +package Net::Riak::Transport::PBC::Code; +use strict; +use warnings; +use base 'Exporter'; + +our @EXPORT_OK = qw/ + REQ_CODE + RESP_CLASS + EXPECTED_RESP + RESP_DECODER +/; + +sub EXPECTED_RESP { + my $code = shift; + return { + 1 => 2, + 3 => 4, + 5 => 6, + 7 => 8, + 9 => 10, + 11 => 12, + 13 => 14, + 15 => 16, + 17 => 18, + 19 => 20, + 21 => 22, + 23 => 24, + }->{$code}; +} +sub RESP_CLASS { + my $code = shift; + + return { + 0 => 'RpbErrorResp', + 2 => 'RpbPingResp', + 4 => 'RpbGetClientIdResp', + 6 => 'RpbSetClientIdResp', + 8 => 'RpbGetServerInfoResp', + 10 => 'RpbGetResp', + 12 => 'RpbPutResp', + 14 => 'RpbDelResp', + 16 => 'RpbListBucketsResp', + 18 => 'RpbListKeysResp', + 20 => 'RpbGetBucketResp', + 22 => 'RpbSetBucketResp', + 24 => 'RpbMapRedResp', + }->{$code}; +} + +sub RESP_DECODER { + my $code = shift; + + return { + 0 => 'RpbErrorResp', + 2 => undef, + 4 => 'RpbGetClientIdResp', + 6 => undef, + 8 => 'RpbGetServerInfoResp', + 10 => 'RpbGetResp', + 12 => 'RpbPutResp', + 14 => undef, + 16 => 'RpbListBucketsResp', + 18 => 'RpbListKeysResp', + 20 => 'RpbGetBucketResp', + 22 => undef, + 24 => 'RpbMapRedResp' + }->{$code}; +}; + + +sub REQ_CODE { + my $class = shift; + + return { + RpbPingReq => 1, + RpbGetClientIdReq => 3, + RpbSetClientIdReq => 5, + RpbGetServerInfoReq => 7, + RpbGetReq => 9, + RpbPutReq => 11, + RpbDelReq => 13, + RpbListBucketsReq => 15, + RpbListKeysReq => 17, + RpbGetBucketReq => 19, + RpbSetBucketReq => 21, + RpbMapRedReq => 23, + }->{$class}; +} + +1; diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm new file mode 100644 index 0000000..75170de --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Message.pm @@ -0,0 +1,121 @@ +package Net::Riak::Transport::PBC::Message; + +use Moose; +use MooseX::Types::Moose qw/Str HashRef Int/; +use Net::Riak::Types 'Socket'; +use Net::Riak::Transport::PBC::Code qw/ + REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/; +use Net::Riak::Transport::PBC::Transport; + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +has request => ( + isa => 'Str', + is => 'ro', + lazy_build => 1, +); + +has request_code => ( + required => 1, + isa => Int, + is => 'ro', + lazy_build => 1, +); + +has message_type => ( + required => 1, + isa => Str, + is => 'ro', + trigger => sub { + $_[0]->{message_type} = 'Rpb'.$_[1]; + } +); + +has params => ( + is => 'ro', + isa => HashRef, +); + +sub _build_request_code { + my $self = shift; + return REQ_CODE($self->message_type); +} + +sub _build_request { + my $self = shift; + $self->_pack_request( $self->request_code, $self->encode ); +} + +sub _pack_request { + my ($self, $code, $req) = @_; + my $h = pack('c', $code) . $req; + use bytes; + my $len = length $h; + return pack('N',$len).$h; +} + +sub encode { + my $self = shift; + return $self->message_type->can('encode') + ? $self->message_type->encode( $self->params ) + : ''; +} + +sub decode { + my ($self, $type, $raw_content) = @_; + return 'Rpb'.$type->decode($raw_content); +} + +sub send { + my ($self, $cb) = @_; + + die "No socket? did you forget to ->connect?" unless $self->has_socket; + + $self->socket->print($self->request); + + my $resp = $self->handle_response; + + return $resp unless $cb; + + $cb->($resp); + while (!$resp->done) { + $resp = $self->handle_response; +# use YAML::Syck; warn Dump $resp; + $cb->($resp); + } + return 1; +} + +sub handle_response { + my $self = shift; + my ($code, $resp) = $self->_unpack_response; + + my $expected_code = EXPECTED_RESP($self->request_code); + + if ($expected_code != $code) { + # TODO throw object + die "Expecting response type " + . RESP_CLASS($expected_code) + . " got " . RESP_CLASS($code); + } + + return 1 unless RESP_DECODER($code); + return RESP_DECODER($code)->decode($resp); +} + +sub _unpack_response { + my $self = shift; + my ( $len, $code, $msg ); + $self->socket->read( $len, 4 ); + $len = unpack( 'N', $len ); + $self->socket->read( $code, 1 ); + $code = unpack( 'c', $code ); + $self->socket->read( $msg, $len - 1 ); + return ( $code, $msg ); +} + +1; diff --git a/lib/Net/Riak/Transport/PBC/Transport.pm b/lib/Net/Riak/Transport/PBC/Transport.pm new file mode 100644 index 0000000..768c32d --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Transport.pm @@ -0,0 +1,483 @@ +package Net::Riak::Transport::PBC; + +## +## This file was generated by Google::ProtocolBuffers (0.08) +## on Mon Dec 13 11:30:34 2010 +## +use strict; +use warnings; +use Google::ProtocolBuffers; +{ + unless (RpbSetClientIdReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbSetClientIdReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'client_id', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbSetBucketReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbSetBucketReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbBucketProps', + 'props', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPutReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPutReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 3, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbContent', + 'content', 4, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'w', 5, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'dw', 6, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'return_body', 7, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListBucketsResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListBucketsResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'buckets', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetBucketResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetBucketResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + 'RpbBucketProps', + 'props', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'r', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetBucketReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetBucketReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbLink->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbLink', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'tag', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbContent', + 'content', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPair->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPair', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'value', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbPutResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbPutResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbContent', + 'content', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vclock', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbDelReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbDelReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'key', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'rw', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbMapRedReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbMapRedReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'request', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_type', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbMapRedResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbMapRedResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'phase', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'response', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'done', 3, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetClientIdResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetClientIdResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'client_id', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbErrorResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbErrorResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'errmsg', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'errcode', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbBucketProps->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbBucketProps', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'n_val', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'allow_mult', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbGetServerInfoResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbGetServerInfoResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'node', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'server_version', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListKeysReq->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListKeysReq', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'bucket', 1, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbListKeysResp->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbListKeysResp', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'keys', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BOOL(), + 'done', 2, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + + unless (RpbContent->can('_pb_fields_list')) { + Google::ProtocolBuffers->create_message( + 'RpbContent', + [ + [ + Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'value', 1, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_type', 2, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'charset', 3, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'content_encoding', 4, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_BYTES(), + 'vtag', 5, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbLink', + 'links', 6, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'last_mod', 7, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), + Google::ProtocolBuffers::Constants::TYPE_UINT32(), + 'last_mod_usecs', 8, undef + ], + [ + Google::ProtocolBuffers::Constants::LABEL_REPEATED(), + 'RpbPair', + 'usermeta', 9, undef + ], + + ], + { 'create_accessors' => 1, } + ); + } + +} + +1; diff --git a/lib/Net/Riak/Transport/REST.pm b/lib/Net/Riak/Transport/REST.pm new file mode 100644 index 0000000..434f4be --- /dev/null +++ b/lib/Net/Riak/Transport/REST.pm @@ -0,0 +1,11 @@ +package Net::Riak::Transport::REST; + +use Moose::Role; + +with qw/ + Net::Riak::Role::UserAgent + Net::Riak::Role::REST + Net::Riak::Role::Hosts + /; + +1; diff --git a/lib/Net/Riak/Types.pm b/lib/Net/Riak/Types.pm new file mode 100644 index 0000000..a1e28b4 --- /dev/null +++ b/lib/Net/Riak/Types.pm @@ -0,0 +1,38 @@ +package Net::Riak::Types; + +use MooseX::Types::Moose qw/Str ArrayRef HashRef/; +use MooseX::Types::Structured qw(Tuple Optional Dict); +use MooseX::Types -declare => + [qw(Socket Client HTTPResponse HTTPRequest RiakHost)]; + +class_type Socket, { class => 'IO::Socket::INET' }; +class_type Client, { class => 'Net::Riak::Client' }; +class_type HTTPRequest, { class => 'HTTP::Request' }; +class_type HTTPResponse, { class => 'HTTP::Response' }; + +subtype RiakHost, as ArrayRef [HashRef]; + +coerce RiakHost, from Str, via { + [ { node => $_, weight => 1 } ]; +}; + +coerce RiakHost, from ArrayRef, via { + warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release."; + my $backends = $_; + my $weight = 1 / @$backends; + [ map { { node => $_, weight => $weight } } @$backends ]; +}; + +coerce RiakHost, from HashRef, via { + warn "DEPRECATED: Support for multiple hosts will be removed in the 0.17 release."; + my $backends = $_; + my $total = 0; + $total += $_ for values %$backends; + [ + map { { node => $_, weight => $backends->{$_} / $total } } + keys %$backends + ]; +}; + +1; + |
