summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/MapReduce.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net/Riak/MapReduce.pm')
-rw-r--r--lib/Net/Riak/MapReduce.pm107
1 files changed, 71 insertions, 36 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index 632d484..14e4007 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -10,7 +10,7 @@ use Net::Riak::LinkPhase;
use Net::Riak::MapReducePhase;
with 'Net::Riak::Role::Base' =>
- {classes => [{name => 'client', required => 0}]};
+ {classes => [{name => 'client', required => 1}]};
has phases => (
traits => ['Array'],
@@ -110,7 +110,7 @@ sub map {
my $map_reduce = Net::Riak::MapReducePhase->new(
type => 'map',
function => $function,
- keep => $options{keep} || JSON::false,
+ keep => $options{keep} ? JSON::true : JSON::false,
arg => $options{arg} || [],
);
$self->add_phase($map_reduce);
@@ -166,13 +166,18 @@ sub run {
my $content = JSON::encode_json($job);
- my $request =
- $self->client->request('POST', [$self->client->mapred_prefix]);
+ my $request = $self->client->new_request(
+ 'POST', [$self->client->mapred_prefix]
+ );
$request->content($content);
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
+
+ unless ($response->is_success) {
+ die "MapReduce query failed: ".$response->status_line;
+ }
- my $result = JSON::decode_json($response->content);
+ my $result = JSON::decode_json($response->content);
if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
$self->client->useragent->timeout($ua_timeout);
@@ -200,17 +205,35 @@ sub run {
=head1 SYNOPSIS
+ use Net::Riak;
+
my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
- my $bucket = $riak->bucket("mybucket");
+ my $bucket = $riak->bucket("Cats");
+
+ my $query = $riak->add("Cats");
+ $query->map(
+ 'function(v, d, a) { return [v]; }',
+ arg => [qw/some params to your function/]
+ );
+
+ $query->reduce("function(v) { return [v];}");
+ my $json = $query->run(10000);
+
+ # can also be used like:
- my $mapred = $riak->add("mybucket");
- $mapred->map('function(v) { return [v]; }');
- $mapred->reduce("function(v) { return v;}");
- my $res = $mapred->run(10000);
+ my $query = Net::Riak::MapReduce->new(
+ client => $riak->client
+ );
+
+ # named functions
+ my $json = $query->add_bucket('Dogs')
+ ->map('Riak.mapValuesJson')
+ ->reduce('Your.SortFunction')
+ ->run;
=head1 DESCRIPTION
-The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak.
+The MapReduce object allows you to build up and run a map/reduce operations on Riak.
=head2 ATTRIBUTES
@@ -226,11 +249,9 @@ The RiakMapReduce object allows you to build up and run a map/reduce operation o
=back
-=head2 METHODS
+=head1 METHODS
-=over 4
-
-=item add
+=head2 add
arguments: bucketname or arrays or L<Net::Riak::Object>
@@ -248,17 +269,17 @@ Add your inputs to a MapReduce job
$mapred->add( "alice", "p5" );
$mapred->add( $riak->bucket("alice")->get("p6") );
-=item add_object
+=head2 add_object
-=item add_bucket_key_data
+=head2 add_bucket_key_data
-=item add_bucket
+=head2 add_bucket
-=item link
+=head2 link
arguments: bucketname, tag, keep
-return: self
+return: $self
Add a link phase to the map/reduce operation.
@@ -268,43 +289,57 @@ The default value for tag is '_'.
The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
-=item map
+=head2 map
-arguments: function, options
+arguments: $function, %options
return: self
+ ->map("function () {..}", keep => 0, args => ['foo', 'bar']);
+ ->map('Riak.mapValuesJson'); # de-serializes data into JSON
+
Add a map phase to the map/reduce operation.
functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
-options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+%options is an optional associative array containing:
-=item reduce
+ language
+ keep - flag
+ arg - an arrayref of parameterss for the JavaScript function
-arguments: function, options
+=head2 reduce
-return: self
+arguments: $function, %options
+
+return: $self
+
+ ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']);
Add a reduce phase to the map/reduce operation.
functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
-options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+=head2 run
-=item run
+arguments: $function, %options
-arguments: function, options
+arguments: $timeout
-arguments: timeout
+return: arrayref
-return: array
+Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase.
-Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase.
+Timeout in milliseconds,
-Timeout in milliseconds.
+=head2 SEE ALSO
-=back
+REST API
-=cut
+https://wiki.basho.com/display/RIAK/MapReduce
+
+List of built-in named functions for map / reduce phases
+http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496
+
+=cut