summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/MapReduce.pm
diff options
context:
space:
mode:
Diffstat (limited to 'lib/Net/Riak/MapReduce.pm')
-rw-r--r--lib/Net/Riak/MapReduce.pm134
1 files changed, 87 insertions, 47 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index 6ed8631..03a3dd3 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -147,23 +147,34 @@ sub run {
$inputs = $self->inputs;
}
+ my $ua_timeout = $self->client->useragent->timeout();
+
my $job = {inputs => $inputs, query => $query};
if ($timeout) {
- $job->{$timeout} = $timeout;
+ if ($ua_timeout < ($timeout/1000)) {
+ $self->client->useragent->timeout(int($timeout/1000));
+ }
+ $job->{timeout} = $timeout;
}
my $content = JSON::encode_json($job);
- my $request =
- $self->client->request('POST', [$self->client->mapred_prefix]);
+ my $request = $self->client->new_request(
+ 'POST', [$self->client->mapred_prefix]
+ );
$request->content($content);
- my $response = $self->client->useragent->request($request);
+
+ my $response = $self->client->send_request($request);
unless ($response->is_success) {
- die $response->content;
+ die "MapReduce query failed: ".$response->status_line;
}
- my $result = JSON::decode_json($response->content);
+ my $result = JSON::decode_json($response->content);
+
+ if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
+ $self->client->useragent->timeout($ua_timeout);
+ }
my @phases = $self->phases;
if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
@@ -187,31 +198,35 @@ sub run {
=head1 SYNOPSIS
- use Net::Riak;
+ use Net::Riak;
- my $client = Net::Riak->new(..);
+ my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
+ my $bucket = $riak->bucket("Cats");
- $client->add('Cats')
- ->map(qq/ function (value) {
- return [value.toSource()]
- }
- /);
+ my $query = $riak->add("Cats");
+ $query->map(
+ 'function(v, d, a) { return [v]; }',
+ arg => [qw/some params to your function/]
+ );
- my $json = $query->run;
+ $query->reduce("function(v) { return [v];}");
+ my $json = $query->run(10000);
- # OR
+ # can also be used like:
- my $query = Net::Riak::MapReduce->new(
- client => $client
- );
+ my $query = Net::Riak::MapReduce->new(
+ client => $riak->client
+ );
- my $json = $query->add_bucket('')->map('Riak.mapValuesJson')
- ->reduce('..')
- ->run;
+ # named functions
+ my $json = $query->add_bucket('Dogs')
+ ->map('Riak.mapValuesJson')
+ ->reduce('Your.SortFunction')
+ ->run;
=head1 DESCRIPTION
-Used to construct map/reduce querys.
+The MapReduce object allows you to build up and run a map/reduce operations on Riak.
=head2 ATTRIBUTES
@@ -227,62 +242,87 @@ Used to construct map/reduce querys.
=back
-=head2 METHODS
+=head1 METHODS
-=over 4
+=head2 add
-=method add
+arguments: bucketname or object
+
+return: a Net::Riak::MapReduce object
Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg.
-=method add_object
+=head2 add_object
-Add a Net::Riak::Object as an input to a map/reduce query.
+=head2 add_bucket_key_data
-=method add_bucket_key_data
+=head2 add_bucket
-=method add_bucket
+=head2 link
-Add a bucket by name, as an input to a map/reduce query.
+arguments: bucketname, tag, keep
-=method link
+return: $self
-=method map
+Add a link phase to the map/reduce operation.
-Adds a map phase to the current query.
+The default value for bucket name is '_', which means all buckets.
- keep - determines if the output of the function should be kept
- args - passed as arguments to the JavaScript function
+The default value for tag is '_'.
- ->map("function () {..}", keep => 0, args => ['foo', 'bar']);
+The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
-Named functions can also be used.
+=head2 map
- -map('Riak.mapValuesJson'); # de-serializes data into JSON
+arguments: $function, %options
-=method reduce
+return: self
-Adds a reduce phase to the current query.
+ ->map("function () {..}", keep => 0, args => ['foo', 'bar']);
+ ->map('Riak.mapValuesJson'); # de-serializes data into JSON
- ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']);
+Add a map phase to the map/reduce operation.
-=method run
+functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
-Executes the query, preforming a HTTP request via Riak's REST API.
+%options is an optional associative array containing:
-It will attempt to de-serialize the JSON response to a perl structure.
+ language
+ keep - flag
+ arg - an arrayref of parameterss for the JavaScript function
-=back
+=head2 reduce
+
+arguments: $function, %options
+
+return: $self
+
+ ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']);
+
+Add a reduce phase to the map/reduce operation.
+
+functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
+
+=head2 run
+
+arguments: $function, %options
+
+arguments: $timeout
+
+return: arrayref
+
+Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase.
+
+Timeout in milliseconds,
=head2 SEE ALSO
REST API
-https://wiki.basho.com/display/RIAK/MapReduce#MapReduce-MapReduceviatheRESTAPI
+https://wiki.basho.com/display/RIAK/MapReduce
List of built-in named functions for map / reduce phases
http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496
=cut
-