summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/MapReduce.pm
diff options
context:
space:
mode:
authorRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
committerRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
commit79bea382fd2c0753ca9ace79a11bb74c9a1d722b (patch)
treebde42a47792a27e0a863ee527b88c8c24258f7e9 /lib/Net/Riak/MapReduce.pm
parentMerge remote branch 'simon/fix_link_encoding' (diff)
downloadnet-riak-79bea382fd2c0753ca9ace79a11bb74c9a1d722b.tar.gz
merged pbc branch to master
Diffstat (limited to 'lib/Net/Riak/MapReduce.pm')
-rw-r--r--lib/Net/Riak/MapReduce.pm31
1 files changed, 5 insertions, 26 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index d05c30a..10a7c98 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -6,6 +6,8 @@ use JSON;
use Moose;
use Scalar::Util;
+use Data::Dumper;
+
use Net::Riak::LinkPhase;
use Net::Riak::MapReducePhase;
@@ -156,35 +158,12 @@ sub run {
$inputs = $self->inputs;
}
- my $ua_timeout = $self->client->useragent->timeout();
-
my $job = {inputs => $inputs, query => $query};
- if ($timeout) {
- if ($ua_timeout < ($timeout/1000)) {
- $self->client->useragent->timeout(int($timeout/1000));
- }
- $job->{timeout} = $timeout;
- }
-
-
- my $content = JSON::encode_json($job);
- my $request = $self->client->new_request(
- 'POST', [$self->client->mapred_prefix]
- );
- $request->content($content);
-
- my $response = $self->client->send_request($request);
-
- unless ($response->is_success) {
- die "MapReduce query failed: ".$response->status_line;
- }
-
- my $result = JSON::decode_json($response->content);
+ # how phases set to 'keep'.
+ my $p = scalar ( grep { $_->keep } $self->phases);
- if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
- $self->client->useragent->timeout($ua_timeout);
- }
+ my $result = $self->client->execute_job($job, $timeout, $p);
my @phases = $self->phases;
if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {