diff options
| -rw-r--r-- | Changes | 8 | ||||
| -rw-r--r-- | dist.ini | 2 | ||||
| -rw-r--r-- | lib/Net/Riak.pm | 28 | ||||
| -rw-r--r-- | lib/Net/Riak/Bucket.pm | 122 | ||||
| -rw-r--r-- | lib/Net/Riak/Client.pm | 32 | ||||
| -rw-r--r-- | lib/Net/Riak/MapReduce.pm | 134 | ||||
| -rw-r--r-- | lib/Net/Riak/Object.pm | 60 | ||||
| -rw-r--r-- | lib/Net/Riak/Role/MapReduce.pm | 2 | ||||
| -rw-r--r-- | lib/Net/Riak/Role/REST.pm | 20 | ||||
| -rw-r--r-- | lib/Net/Riak/Role/UserAgent.pm | 7 | ||||
| -rw-r--r-- | t/08_stream.t | 37 |
11 files changed, 325 insertions, 127 deletions
@@ -1,3 +1,11 @@ +0.09 Tue 05 Oct 2010 12:03:59 PM CEST + - typo + - when executing a map/reduce, if the timeout of the map/reduce is higher + than the value for the useragent's timeout, change for this query the + value of the useragent timeout to be the same as the one for the job. + - add key callbacks to Bucket get_properties/get_keys stream mode + (Gavin Carr) + 0.08 Mon 06 Sep 2010 10:52:15 AM CEST - allow to store non-json object (Gavin Carr) - increase LWP's MaxLineLength (Gavin Carr) @@ -3,7 +3,7 @@ author = franck cuny <franck@lumberjaph.net> license = Perl_5 copyright_holder = linkfluence copyright_year = 2010 -version = 0.08 +version = 0.09 [@Filter] bundle = @Basic diff --git a/lib/Net/Riak.pm b/lib/Net/Riak.pm index 5399ff3..77bd773 100644 --- a/lib/Net/Riak.pm +++ b/lib/Net/Riak.pm @@ -13,7 +13,7 @@ has client => ( is => 'rw', isa => 'Net::Riak::Client', required => 1, - handles => [qw/request useragent is_alive/] + handles => [qw/is_alive http_request http_response/] ); sub BUILDARGS { @@ -39,6 +39,8 @@ sub bucket { $obj->store; my $obj = $bucket->get('new_post'); + my $req = $client->http_request; # last request + $client->http_response # last response =head1 DESCRIPTION @@ -100,15 +102,15 @@ client_id for this client =back -=head2 METHODS +=head1 METHODS -=method bucket +=head2 bucket my $bucket = $client->bucket($name); Get the bucket by the specified name. Since buckets always exist, this will always return a L<Net::Riak::Bucket> -=method is_alive +=head2 is_alive if (!$client->is_alive) { ... @@ -116,30 +118,40 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa Check if the Riak server for this client is alive -=method add +=head2 add my $map_reduce = $client->add('bucket_name', 'key'); Start assembling a Map/Reduce operation -=method link +=head2 link my $map_reduce = $client->link(); Start assembling a Map/Reduce operation -=method map +=head2 map my $map_reduce = $client->add('bucket_name', 'key')->map("function ..."); Start assembling a Map/Reduce operation -=method reduce +=head2 reduce my $map_reduce = $client->add(..)->map(..)->reduce("function ..."); Start assembling a Map/Reduce operation +=method http_request + +Returns the HTTP::Request object from the last request + +=method http_response + +Returns a HTTP::Response object from the last request + =head2 SEE ALSO Net::Riak::MapReduce + +=cut diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm index cd42316..8f263cf 100644 --- a/lib/Net/Riak/Bucket.pm +++ b/lib/Net/Riak/Bucket.pm @@ -8,8 +8,9 @@ use Carp; use Net::Riak::Object; with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]}; -with 'Net::Riak::Role::Base' => - {classes => [{name => 'client', required => 1}]}; +with 'Net::Riak::Role::Base' => { + classes => [{ name => 'client', required => 1, }] +}; has name => ( is => 'ro', @@ -45,8 +46,10 @@ sub allow_multiples { } sub get_keys { - my $self = shift; - my $properties = $self->get_properties({keys => 'true', props => 'false'}); + my ($self, $params) = @_; + my $key_mode = delete($params->{stream}) ? 'stream' : 'true'; + $params = { props => 'false', keys => $key_mode, %$params }; + my $properties = $self->get_properties($params); return $properties->{keys}; } @@ -76,32 +79,57 @@ sub get_property { sub get_properties { my ($self, $params) = @_; + # Callbacks require stream mode + $params->{keys} = 'stream' if $params->{cb}; + $params->{props} = 'true' unless exists $params->{props}; $params->{keys} = 'false' unless exists $params->{keys}; - my $request = - $self->client->request('GET', [$self->client->prefix, $self->name], - $params); + my $request = $self->client->new_request( + 'GET', [$self->client->prefix, $self->name], $params + ); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); - if (!$response->is_success) { - die "Error getting bucket properties: " . $response->status_line . "\n"; + unless ($response->is_success) { + die "Error getting bucket properties: ".$response->status_line."\n"; } - return JSON::decode_json($response->content); + if ($params->{keys} ne 'stream') { + return JSON::decode_json($response->content); + } + + # In streaming mode, aggregate keys from the multiple returned chunk objects + else { + my $json = JSON->new; + my $props = $json->incr_parse($response->content); + if ($params->{cb}) { + while (defined(my $obj = $json->incr_parse)) { + $params->{cb}->($_) foreach @{$obj->{keys}}; + } + return %$props ? { props => $props } : {}; + } + else { + my @keys = map { $_->{keys} && ref $_->{keys} eq 'ARRAY' ? @{$_->{keys}} : () } + $json->incr_parse; + return { props => $props, keys => \@keys }; + } + } } sub set_properties { my ($self, $props) = @_; - my $request = $self->client->request('PUT', [$self->client->prefix, $self->name]); + my $request = $self->client->new_request( + 'PUT', [$self->client->prefix, $self->name] + ); + $request->header('Content-Type' => $self->content_type); $request->content(JSON::encode_json({props => $props})); - my $response = $self->client->useragent->request($request); - if (!$response->is_success) { - die "Error setting bucket properties: " . $response->status_line . "\n"; + my $response = $self->client->send_request($request); + unless ($response->is_success) { + die "Error setting bucket properties: ".$response->status_line."\n"; } } @@ -123,9 +151,13 @@ sub new_object { my $client = Net::Riak->new(...); my $bucket = $client->bucket('foo'); - my $object = $bucket->new_object('foo', {...}); + + # retrieve an existing object + my $obj1 = $bucket->get('foo'); + + # create/store a new object + my $obj2 = $bucket->new_object('foo2', {...}); $object->store; - $object->get('foo2'); =head1 DESCRIPTION @@ -163,55 +195,87 @@ DW value setting for this client (default 2) =head2 METHODS -=method new_object +=over 4 + +=item new_object my $obj = $bucket->new_object($key, $data, @args); Create a new L<Net::Riak::Object> object. Additional Object constructor arguments can be passed after $data. If $data is a reference and no explicit Object content_type is given in @args, the data will be serialised and stored as JSON. -=method get +=item get my $obj = $bucket->get($key, [$r]); Retrieve an object from Riak. -=method n_val +=item n_val my $n_val = $bucket->n_val; Get/set the N-value for this bucket, which is the number of replicas that will be written of each object in the bucket. Set this once before you write any data to the bucket, and never change it again, otherwise unpredictable things could happen. This should only be used if you know what you are doing. -=method allow_multiples +=item allow_multiples $bucket->allow_multiples(1|0); If set to True, then writes with conflicting data will be stored and returned to the client. This situation can be detected by calling has_siblings() and get_siblings(). This should only be used if you know what you are doing. -=method get_keys +=item get_keys my $keys = $bucket->get_keys; + my $keys = $bucket->get_keys($args); + +Return an arrayref of the list of keys for a bucket. Optionally takes a hashref of named parameters. Supported parameters are: + +=over 4 -Return an arrayref of the list of keys for a bucket. +=item stream => 1 -=method set_property +Uses key streaming mode to fetch the list of keys, which may be faster for large keyspaces. + +=item cb => sub { } + +A callback subroutine to be called for each key found (passed in as the only parameter). get_keys() returns nothing in callback mode. + +=back + +=item set_property $bucket->set_property({n_val => 2}); Set a bucket property. This should only be used if you know what you are doing. -=method get_property +=item get_property my $prop = $bucket->get_property('n_val'); Retrieve a bucket property. -=method set_properties +=item set_properties Set multiple bucket properties in one call. This should only be used if you know what you are doing. -=method get_properties +=item get_properties + +Retrieve an associative array of all bucket properties, containing 'props' and 'keys' elements. + +Accepts a hashref of parameters. Supported parameters are: + +=over 4 + +=item props => 'true'|'false' + +Whether to return bucket properties. Defaults to 'true' if no parameters are given. + +=item keys => 'true'|'false'|'stream' -Retrieve an associative array of all bucket properties. By default, 'props' is set to true and 'keys' to false. You can change this default: +Whether to return bucket keys. If set to 'stream', uses key streaming mode, which may be faster for large keyspaces. - my $properties = $bucket->get_properties({keys=>'true'}); +=item cb => sub { } +A callback subroutine to be called for each key found (passed in as the only parameter). Implies keys => 'stream'. Keys are omitted from the results hashref in callback mode. + +=back + +=back diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm index 19d172f..e76a0ef 100644 --- a/lib/Net/Riak/Client.pm +++ b/lib/Net/Riak/Client.pm @@ -2,12 +2,10 @@ package Net::Riak::Client; use Moose; use MIME::Base64; +use Moose::Util::TypeConstraints; -with qw/ - Net::Riak::Role::REST - Net::Riak::Role::UserAgent - Net::Riak::Role::Hosts - /; +class_type 'HTTP::Request'; +class_type 'HTTP::Response'; has prefix => ( is => 'rw', @@ -29,6 +27,24 @@ has client_id => ( isa => 'Str', lazy_build => 1, ); +has http_request => ( + is => 'rw', + isa => 'HTTP::Request', +); + +has http_response => ( + is => 'rw', + isa => 'HTTP::Response', + handles => ['is_success'] +); + +with 'Net::Riak::Role::UserAgent'; +with qw/ + Net::Riak::Role::REST + Net::Riak::Role::Hosts + /; + + sub _build_client_id { "perl_net_riak" . encode_base64(int(rand(10737411824)), ''); @@ -36,9 +52,9 @@ sub _build_client_id { sub is_alive { my $self = shift; - my $request = $self->request('GET', ['ping']); - my $response = $self->useragent->request($request); - $response->is_success ? return 1 : return 0; + my $request = $self->new_request('GET', ['ping']); + my $response = $self->send_request($request); + $self->is_success ? return 1 : return 0; } 1; diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm index 6ed8631..03a3dd3 100644 --- a/lib/Net/Riak/MapReduce.pm +++ b/lib/Net/Riak/MapReduce.pm @@ -147,23 +147,34 @@ sub run { $inputs = $self->inputs; } + my $ua_timeout = $self->client->useragent->timeout(); + my $job = {inputs => $inputs, query => $query}; if ($timeout) { - $job->{$timeout} = $timeout; + if ($ua_timeout < ($timeout/1000)) { + $self->client->useragent->timeout(int($timeout/1000)); + } + $job->{timeout} = $timeout; } my $content = JSON::encode_json($job); - my $request = - $self->client->request('POST', [$self->client->mapred_prefix]); + my $request = $self->client->new_request( + 'POST', [$self->client->mapred_prefix] + ); $request->content($content); - my $response = $self->client->useragent->request($request); + + my $response = $self->client->send_request($request); unless ($response->is_success) { - die $response->content; + die "MapReduce query failed: ".$response->status_line; } - my $result = JSON::decode_json($response->content); + my $result = JSON::decode_json($response->content); + + if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) { + $self->client->useragent->timeout($ua_timeout); + } my @phases = $self->phases; if (ref $phases[-1] ne 'Net::Riak::LinkPhase') { @@ -187,31 +198,35 @@ sub run { =head1 SYNOPSIS - use Net::Riak; + use Net::Riak; - my $client = Net::Riak->new(..); + my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" ); + my $bucket = $riak->bucket("Cats"); - $client->add('Cats') - ->map(qq/ function (value) { - return [value.toSource()] - } - /); + my $query = $riak->add("Cats"); + $query->map( + 'function(v, d, a) { return [v]; }', + arg => [qw/some params to your function/] + ); - my $json = $query->run; + $query->reduce("function(v) { return [v];}"); + my $json = $query->run(10000); - # OR + # can also be used like: - my $query = Net::Riak::MapReduce->new( - client => $client - ); + my $query = Net::Riak::MapReduce->new( + client => $riak->client + ); - my $json = $query->add_bucket('')->map('Riak.mapValuesJson') - ->reduce('..') - ->run; + # named functions + my $json = $query->add_bucket('Dogs') + ->map('Riak.mapValuesJson') + ->reduce('Your.SortFunction') + ->run; =head1 DESCRIPTION -Used to construct map/reduce querys. +The MapReduce object allows you to build up and run a map/reduce operations on Riak. =head2 ATTRIBUTES @@ -227,62 +242,87 @@ Used to construct map/reduce querys. =back -=head2 METHODS +=head1 METHODS -=over 4 +=head2 add -=method add +arguments: bucketname or object + +return: a Net::Riak::MapReduce object Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg. -=method add_object +=head2 add_object -Add a Net::Riak::Object as an input to a map/reduce query. +=head2 add_bucket_key_data -=method add_bucket_key_data +=head2 add_bucket -=method add_bucket +=head2 link -Add a bucket by name, as an input to a map/reduce query. +arguments: bucketname, tag, keep -=method link +return: $self -=method map +Add a link phase to the map/reduce operation. -Adds a map phase to the current query. +The default value for bucket name is '_', which means all buckets. - keep - determines if the output of the function should be kept - args - passed as arguments to the JavaScript function +The default value for tag is '_'. - ->map("function () {..}", keep => 0, args => ['foo', 'bar']); +The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase) -Named functions can also be used. +=head2 map - -map('Riak.mapValuesJson'); # de-serializes data into JSON +arguments: $function, %options -=method reduce +return: self -Adds a reduce phase to the current query. + ->map("function () {..}", keep => 0, args => ['foo', 'bar']); + ->map('Riak.mapValuesJson'); # de-serializes data into JSON - ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']); +Add a map phase to the map/reduce operation. -=method run +functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') -Executes the query, preforming a HTTP request via Riak's REST API. +%options is an optional associative array containing: -It will attempt to de-serialize the JSON response to a perl structure. + language + keep - flag + arg - an arrayref of parameterss for the JavaScript function -=back +=head2 reduce + +arguments: $function, %options + +return: $self + + ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']); + +Add a reduce phase to the map/reduce operation. + +functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') + +=head2 run + +arguments: $function, %options + +arguments: $timeout + +return: arrayref + +Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase. + +Timeout in milliseconds, =head2 SEE ALSO REST API -https://wiki.basho.com/display/RIAK/MapReduce#MapReduce-MapReduceviatheRESTAPI +https://wiki.basho.com/display/RIAK/MapReduce List of built-in named functions for map / reduce phases http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496 =cut - diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm index 59ecde1..656d71a 100644 --- a/lib/Net/Riak/Object.pm +++ b/lib/Net/Riak/Object.pm @@ -61,7 +61,7 @@ sub store { my $params = {returnbody => 'true', w => $w, dw => $dw}; my $request = - $self->client->request('PUT', + $self->client->new_request('PUT', [$self->client->prefix, $self->bucket->name, $self->key], $params); $request->header('X-Riak-ClientID' => $self->client->client_id); @@ -82,7 +82,7 @@ sub store { $request->content($self->data); } - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); $self->populate($response, [200, 300]); $self; } @@ -98,10 +98,10 @@ sub load { my $params = {r => $self->r}; my $request = - $self->client->request('GET', + $self->client->new_request('GET', [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); $self->populate($response, [200, 300, 404]); $self; } @@ -113,10 +113,10 @@ sub delete { my $params = {dw => $dw}; my $request = - $self->client->request('DELETE', + $self->client->new_request('DELETE', [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); $self->populate($response, [204, 404]); $self; } @@ -205,9 +205,9 @@ sub sibling { my $params = {r => $r, vtag => $vtag}; my $request = - $self->client->request('GET', + $self->client->new_request('GET', [$self->client->prefix, $self->bucket->name, $self->key], $params); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); my $obj = Net::Riak::Object->new( client => $self->client, @@ -343,29 +343,31 @@ Return an array of Siblings =head2 METHODS -=method count_links +=over 4 + +=item count_links Return the number of links -=method append_link +=item append_link Add a new link -=method get_siblings +=item get_siblings Return the number of siblings -=method add_sibling +=item add_sibling Add a new sibling -=method count_siblings +=item count_siblings -=method get_sibling +=item get_sibling Return a sibling -=method store +=item store $obj->store($w, $dw); @@ -383,7 +385,7 @@ DW-value, wait for this many partitions to confirm the write before returning to =back -=method load +=item load $obj->load($w); @@ -397,7 +399,7 @@ R-Value, wait for this many partitions to respond before returning to client. =back -=method delete +=item delete $obj->delete($dw); @@ -411,52 +413,56 @@ DW-value. Wait until this many partitions have deleted the object before respond =back -=method clear +=item clear $obj->reset; Reset this object -=method has_siblings +=item has_siblings if ($obj->has_siblings) { ... } Return true if this object has siblings -=method has_no_siblings +=item has_no_siblings if ($obj->has_no_siblings) { ... } Return true if this object has no siblings -=method populate +=item populate Given the output of RiakUtils.http_request and a list of statuses, populate the object. Only for use by the Riak client library. -=method add_link +=item add_link $obj->add_link($obj2, "tag"); Add a link to a L<Net::Riak::Object> -=method remove_link +=item remove_link $obj->remove_link($obj2, "tag"); Remove a link to a L<Net::Riak::Object> -=method add +=item add Start assembling a Map/Reduce operation -=method link +=item link Start assembling a Map/Reduce operation -=method map +=item map Start assembling a Map/Reduce operation -=method reduce +=item reduce Start assembling a Map/Reduce operation + +=back + +=cut diff --git a/lib/Net/Riak/Role/MapReduce.pm b/lib/Net/Riak/Role/MapReduce.pm index d80e242..48f3805 100644 --- a/lib/Net/Riak/Role/MapReduce.pm +++ b/lib/Net/Riak/Role/MapReduce.pm @@ -20,7 +20,7 @@ sub link { sub map { my ($self, @args) = @_; my $mr = Net::Riak::MapReduce->new(client => $self->client); - $mr->mapd(@args); + $mr->map(@args); $mr; } diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm index 1a18ff7..136ea88 100644 --- a/lib/Net/Riak/Role/REST.pm +++ b/lib/Net/Riak/Role/REST.pm @@ -6,6 +6,10 @@ use URI; use HTTP::Request; use Moose::Role; +requires 'http_request'; +requires 'http_response'; +requires 'useragent'; + sub _build_path { my ($self, $path) = @_; $path = join('/', @$path); @@ -20,10 +24,22 @@ sub _build_uri { $uri; } -sub request { +# constructs a HTTP::Request +sub new_request { my ($self, $method, $path, $params) = @_; my $uri = $self->_build_uri($path, $params); - HTTP::Request->new($method => $uri); + return HTTP::Request->new($method => $uri); +} + +# makes a HTTP::Request returns and stores a HTTP::Response +sub send_request { + my ($self, $req) = @_; + + $self->http_request($req); + my $r = $self->useragent->request($req); + $self->http_response($r); + + return $r; } 1; diff --git a/lib/Net/Riak/Role/UserAgent.pm b/lib/Net/Riak/Role/UserAgent.pm index ecc412f..123a378 100644 --- a/lib/Net/Riak/Role/UserAgent.pm +++ b/lib/Net/Riak/Role/UserAgent.pm @@ -6,9 +6,9 @@ use Moose::Role; use LWP::UserAgent; has useragent => ( - is => 'rw', - isa => 'LWP::UserAgent', - lazy => 1, + is => 'rw', + isa => 'LWP::UserAgent', + lazy => 1, default => sub { my $self = shift; @@ -19,7 +19,6 @@ has useragent => ( @LWP::Protocol::http::EXTRA_SOCK_OPTS = %opts; my $ua = LWP::UserAgent->new; - $ua->timeout(3); $ua; } ); diff --git a/t/08_stream.t b/t/08_stream.t new file mode 100644 index 0000000..becc600 --- /dev/null +++ b/t/08_stream.t @@ -0,0 +1,37 @@ +use strict; +use warnings; +use Test::More; + +use Net::Riak; +use HTTP::Response; + +my $client = Net::Riak::Client->new; +ok my $bucket = Net::Riak::Bucket->new(name => 'bar', client => $client), + 'bucket created'; + +$bucket->client->useragent->add_handler( + request_send => sub { + my $response = HTTP::Response->new(200); + $response->content( + '{}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":["apple"]}{"keys":[]}{"keys":["pear","peach"]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}{"keys":[]}' + ); + $response; + } +); + +ok my $props = $bucket->get_properties({props => 'false', keys => 'stream'}), 'get_properties'; +is_deeply $props, { keys => [ qw(apple pear peach) ], props => {} }, 'keys ok'; + +ok my $keys = $bucket->get_keys({stream => 1}), 'get_keys'; +is_deeply $keys, [qw/apple pear peach/], 'keys ok'; + +my $result = ''; +ok $bucket->get_properties({props => 'false', cb => sub { $result .= "** $_[0] " }}), 'get_properties with callback'; +is $result, '** apple ** pear ** peach ', 'result ok'; + +$result = ''; +ok ! defined $bucket->get_keys({cb => sub { $result .= "--> $_[0] " }}), 'get_keys with callback'; +is $result, '--> apple --> pear --> peach ', 'result ok'; + +done_testing; + |
