diff options
Diffstat (limited to 'lib/Net/Riak/MapReduce.pm')
| -rw-r--r-- | lib/Net/Riak/MapReduce.pm | 107 |
1 files changed, 71 insertions, 36 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm index 632d484..14e4007 100644 --- a/lib/Net/Riak/MapReduce.pm +++ b/lib/Net/Riak/MapReduce.pm @@ -10,7 +10,7 @@ use Net::Riak::LinkPhase; use Net::Riak::MapReducePhase; with 'Net::Riak::Role::Base' => - {classes => [{name => 'client', required => 0}]}; + {classes => [{name => 'client', required => 1}]}; has phases => ( traits => ['Array'], @@ -110,7 +110,7 @@ sub map { my $map_reduce = Net::Riak::MapReducePhase->new( type => 'map', function => $function, - keep => $options{keep} || JSON::false, + keep => $options{keep} ? JSON::true : JSON::false, arg => $options{arg} || [], ); $self->add_phase($map_reduce); @@ -166,13 +166,18 @@ sub run { my $content = JSON::encode_json($job); - my $request = - $self->client->request('POST', [$self->client->mapred_prefix]); + my $request = $self->client->new_request( + 'POST', [$self->client->mapred_prefix] + ); $request->content($content); - my $response = $self->client->useragent->request($request); + my $response = $self->client->send_request($request); + + unless ($response->is_success) { + die "MapReduce query failed: ".$response->status_line; + } - my $result = JSON::decode_json($response->content); + my $result = JSON::decode_json($response->content); if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) { $self->client->useragent->timeout($ua_timeout); @@ -200,17 +205,35 @@ sub run { =head1 SYNOPSIS + use Net::Riak; + my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" ); - my $bucket = $riak->bucket("mybucket"); + my $bucket = $riak->bucket("Cats"); + + my $query = $riak->add("Cats"); + $query->map( + 'function(v, d, a) { return [v]; }', + arg => [qw/some params to your function/] + ); + + $query->reduce("function(v) { return [v];}"); + my $json = $query->run(10000); + + # can also be used like: - my $mapred = $riak->add("mybucket"); - $mapred->map('function(v) { return [v]; }'); - $mapred->reduce("function(v) { return v;}"); - my $res = $mapred->run(10000); + my $query = Net::Riak::MapReduce->new( + client => $riak->client + ); + + # named functions + my $json = $query->add_bucket('Dogs') + ->map('Riak.mapValuesJson') + ->reduce('Your.SortFunction') + ->run; =head1 DESCRIPTION -The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak. +The MapReduce object allows you to build up and run a map/reduce operations on Riak. =head2 ATTRIBUTES @@ -226,11 +249,9 @@ The RiakMapReduce object allows you to build up and run a map/reduce operation o =back -=head2 METHODS +=head1 METHODS -=over 4 - -=item add +=head2 add arguments: bucketname or arrays or L<Net::Riak::Object> @@ -248,17 +269,17 @@ Add your inputs to a MapReduce job $mapred->add( "alice", "p5" ); $mapred->add( $riak->bucket("alice")->get("p6") ); -=item add_object +=head2 add_object -=item add_bucket_key_data +=head2 add_bucket_key_data -=item add_bucket +=head2 add_bucket -=item link +=head2 link arguments: bucketname, tag, keep -return: self +return: $self Add a link phase to the map/reduce operation. @@ -268,43 +289,57 @@ The default value for tag is '_'. The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase) -=item map +=head2 map -arguments: function, options +arguments: $function, %options return: self + ->map("function () {..}", keep => 0, args => ['foo', 'bar']); + ->map('Riak.mapValuesJson'); # de-serializes data into JSON + Add a map phase to the map/reduce operation. functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') -options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg' +%options is an optional associative array containing: -=item reduce + language + keep - flag + arg - an arrayref of parameterss for the JavaScript function -arguments: function, options +=head2 reduce -return: self +arguments: $function, %options + +return: $self + + ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']); Add a reduce phase to the map/reduce operation. functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....') -options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg' +=head2 run -=item run +arguments: $function, %options -arguments: function, options +arguments: $timeout -arguments: timeout +return: arrayref -return: array +Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase. -Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase. +Timeout in milliseconds, -Timeout in milliseconds. +=head2 SEE ALSO -=back +REST API -=cut +https://wiki.basho.com/display/RIAK/MapReduce + +List of built-in named functions for map / reduce phases +http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496 + +=cut |
