summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Net/Riak/MapReduce.pm68
1 files changed, 67 insertions, 1 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index f1a50a6..499335a 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -147,9 +147,14 @@ sub run {
$inputs = $self->inputs;
}
+ my $ua_timeout = $self->client->useragent->timeout();
+
my $job = {inputs => $inputs, query => $query};
if ($timeout) {
- $job->{$timeout} = $timeout;
+ if ($ua_timeout < ($timeout/1000)) {
+ $self->client->useragent->timeout(int($timeout/1000));
+ }
+ $job->{timeout} = $timeout;
}
my $content = JSON::encode_json($job);
@@ -157,10 +162,15 @@ sub run {
my $request =
$self->client->request('POST', [$self->client->mapred_prefix]);
$request->content($content);
+
my $response = $self->client->useragent->request($request);
my $result = JSON::decode_json($response->content);
+ if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
+ $self->client->useragent->timeout($ua_timeout);
+ }
+
my @phases = $self->phases;
if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
return $result;
@@ -183,8 +193,18 @@ sub run {
=head1 SYNOPSIS
+ my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
+ my $bucket = $riak->bucket("mybucket");
+
+ my $mapred = $riak->add("mybucket");
+ $mapred->map('function(v) { return [v]; }');
+ $mapred->reduce("function(v) { return v;}");
+ my $res = $mapred->run(10000);
+
=head1 DESCRIPTION
+The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak.
+
=head2 ATTRIBUTES
=over 4
@@ -205,6 +225,10 @@ sub run {
=item add
+arguments: bucketname
+
+return: a Net::Riak::MapReduce object
+
Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg.
=item add_object
@@ -215,12 +239,54 @@ Add inputs to a map/reduce operation. This method takes three different forms, d
=item link
+arguments: bucketname, tag, keep
+
+return: self
+
+Add a link phase to the map/reduce operation.
+
+The default value for bucket name is '_', which means all buckets.
+
+The default value for tag is '_'.
+
+The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
+
=item map
+arguments: function, options
+
+return: self
+
+Add a map phase to the map/reduce operation.
+
+functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
+
+options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+
=item reduce
+arguments: function, options
+
+return: self
+
+Add a reduce phase to the map/reduce operation.
+
+functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
+
+options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+
=item run
+arguments: function, options
+
+arguments: timeout
+
+return: array
+
+Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase.
+
+Timeout in milliseconds.
+
=back
=cut