summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/MapReduce.pm
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/Net/Riak/MapReduce.pm219
1 files changed, 219 insertions, 0 deletions
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
new file mode 100644
index 0000000..5f76723
--- /dev/null
+++ b/lib/Net/Riak/MapReduce.pm
@@ -0,0 +1,219 @@
+package Net::Riak::MapReduce;
+
+# ABSTRACT: Allows you to build up and run a map/reduce operation on Riak
+
+use JSON;
+use Moose;
+use Scalar::Util;
+
+use Net::Riak::LinkPhase;
+use Net::Riak::MapReducePhase;
+
+has client => (
+ is => 'rw',
+ isa => 'Net::Riak',
+ required => 1,
+);
+has phases => (
+ traits => ['Array'],
+ is => 'rw',
+ isa => 'ArrayRef[Object]',
+ auto_deref => 1,
+ lazy => 1,
+ default => sub { [] },
+ handles => {
+ get_phases => 'elements',
+ add_phase => 'push',
+ num_phases => 'count',
+ get_phase => 'get',
+ },
+);
+has inputs_bucket => (
+ is => 'rw',
+ isa => 'Str',
+ predicate => 'has_inputs_bucket',
+);
+has inputs => (
+ traits => ['Array'],
+ is => 'rw',
+ isa => 'ArrayRef[ArrayRef]',
+ handles => {add_input => 'push',},
+ default => sub { [] },
+);
+has input_mode => (
+ is => 'rw',
+ isa => 'Str',
+ predicate => 'has_input_mode',
+);
+
+sub add {
+ my $self = shift;
+ my $arg = shift;
+
+ if (!scalar @_) {
+ if (blessed($arg)) {
+ $self->add_object($arg);
+ } else {
+ $self->add_bucket($arg);
+ }
+ }
+ else {
+ $self->add_bucket_key_data($arg, @_);
+ }
+}
+
+sub add_object {
+ my ($self, $obj) = @_;
+ $self->add_bucket_key_data($obj->bucket->name, $obj->key);
+}
+
+sub add_bucket_key_data {
+ my ($self, $bucket, $key, $data) = @_;
+ if ($self->has_input_mode && $self->input_mode eq 'bucket') {
+ croak("Already added a bucket, can't add an object");
+ }
+ else {
+ $self->add_input([$bucket, $key, $data]);
+ }
+}
+
+sub add_bucket {
+ my ($self, $bucket) = @_;
+ $self->input_mode('bucket');
+ $self->inputs_bucket($bucket);
+}
+
+sub link {
+ my ($self, $bucket, $tag, $keep) = @_;
+ $bucket ||= '_';
+ $tag ||= '_';
+ $keep ||= JSON::false;
+
+ $self->add_phase(
+ Net::Riak::LinkPhase->new(
+ bucket => $bucket,
+ tag => $tag,
+ keep => $keep
+ )
+ );
+}
+
+sub map {
+ my ($self, $function, %options) = @_;
+
+ my $map_reduce = Net::Riak::MapReducePhase->new(
+ type => 'map',
+ function => $function,
+ keep => $options{keep} || JSON::false,
+ arg => $options{arg} || [],
+ );
+ $self->add_phase($map_reduce);
+}
+
+sub reduce {
+ my ($self, $function, %options) = @_;
+
+ my $map_reduce = Net::Riak::MapReducePhase->new(
+ type => 'reduce',
+ function => $function,
+ keep => $options{keep} || JSON::false,
+ arg => $options{arg} || [],
+ );
+ $self->add_phase($map_reduce);
+}
+
+sub run {
+ my ($self, $timeout) = @_;
+
+ my $num_phases = $self->num_phases;
+ my $keep_flag = 0;
+ my $query = [];
+
+ my $total_phase = $self->num_phases;
+ foreach my $i (0 .. ($total_phase - 1)) {
+ my $phase = $self->get_phase($i);
+ if ($i == ($total_phase - 1) && !$keep_flag) {
+ $phase->keep(JSON::true);
+ }
+ $keep_flag = 1 if ($phase->{keep}->isa(JSON::true));
+ push @$query, $phase->to_array;
+ }
+
+ my $inputs;
+ if ($self->has_input_mode && $self->input_mode eq 'bucket' && $self->has_inputs_bucket) {
+ $inputs = $self->inputs_bucket;
+ }else{
+ $inputs = $self->inputs;
+ }
+
+ my $job = {inputs => $inputs, query => $query};
+ if ($timeout) {
+ $job->{$timeout} = $timeout;
+ }
+
+ my $content = JSON::encode_json($job);
+
+ my $request =
+ $self->client->request('POST', [$self->client->mapred_prefix]);
+ $request->content($content);
+ my $response = $self->client->useragent->request($request);
+
+ my $result = JSON::decode_json($response->content);
+
+ my @phases = $self->phases;
+ if (ref $phases[-1] ne 'Net::Riak::LinkPhase') {
+ return $result;
+ }
+
+ my $a = [];
+ foreach (@$result) {
+ my $l = Net::Riak::Link->new(
+ bucket => $_->[0],
+ key => $_->[1],
+ tag => $_->[2],
+ client => $self->client
+ );
+ push @$a, $l;
+ }
+ return $a;
+}
+
+1;
+
+=head1 SYNOPSIS
+
+=head1 DESCRIPTION
+
+=head2 ATTRIBUTES
+
+=over 4
+
+=item B<phases>
+
+=item B<inputs_bucket>
+
+=item B<inputs>
+
+=item B<input_mode>
+
+=back
+
+=head2 METHODS
+
+=method add
+
+Add inputs to a map/reduce operation. This method takes three different forms, depending on the provided inputs. You can specify either a RiakObject, a string bucket name, or a bucket, key, and additional arg.
+
+=method add_object
+
+=method add_bucket_key_data
+
+=method add_bucket
+
+=method link
+
+=method map
+
+=method reduce
+
+=method run