summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--lib/Net/Riak.pm33
-rw-r--r--lib/Net/Riak/Bucket.pm28
-rw-r--r--lib/Net/Riak/Client.pm32
-rw-r--r--lib/Net/Riak/MapReduce.pm107
-rw-r--r--lib/Net/Riak/Object.pm16
-rw-r--r--lib/Net/Riak/Role/REST.pm20
6 files changed, 158 insertions, 78 deletions
diff --git a/lib/Net/Riak.pm b/lib/Net/Riak.pm
index 156779b..77bd773 100644
--- a/lib/Net/Riak.pm
+++ b/lib/Net/Riak.pm
@@ -13,7 +13,7 @@ has client => (
is => 'rw',
isa => 'Net::Riak::Client',
required => 1,
- handles => [qw/request useragent is_alive/]
+ handles => [qw/is_alive http_request http_response/]
);
sub BUILDARGS {
@@ -39,6 +39,8 @@ sub bucket {
$obj->store;
my $obj = $bucket->get('new_post');
+ my $req = $client->http_request; # last request
+ $client->http_response # last response
=head1 DESCRIPTION
@@ -100,17 +102,15 @@ client_id for this client
=back
-=head2 METHODS
+=head1 METHODS
-=over 4
-
-=item bucket
+=head2 bucket
my $bucket = $client->bucket($name);
Get the bucket by the specified name. Since buckets always exist, this will always return a L<Net::Riak::Bucket>
-=item is_alive
+=head2 is_alive
if (!$client->is_alive) {
...
@@ -118,31 +118,40 @@ Get the bucket by the specified name. Since buckets always exist, this will alwa
Check if the Riak server for this client is alive
-=item add
+=head2 add
my $map_reduce = $client->add('bucket_name', 'key');
Start assembling a Map/Reduce operation
-=item link
+=head2 link
my $map_reduce = $client->link();
Start assembling a Map/Reduce operation
-=item map
+=head2 map
my $map_reduce = $client->add('bucket_name', 'key')->map("function ...");
Start assembling a Map/Reduce operation
-=item reduce
+=head2 reduce
my $map_reduce = $client->add(..)->map(..)->reduce("function ...");
Start assembling a Map/Reduce operation
-=back
+=method http_request
-=cut
+Returns the HTTP::Request object from the last request
+
+=method http_response
+Returns a HTTP::Response object from the last request
+
+=head2 SEE ALSO
+
+Net::Riak::MapReduce
+
+=cut
diff --git a/lib/Net/Riak/Bucket.pm b/lib/Net/Riak/Bucket.pm
index 66359d3..8f263cf 100644
--- a/lib/Net/Riak/Bucket.pm
+++ b/lib/Net/Riak/Bucket.pm
@@ -8,8 +8,9 @@ use Carp;
use Net::Riak::Object;
with 'Net::Riak::Role::Replica' => {keys => [qw/r w dw/]};
-with 'Net::Riak::Role::Base' =>
- {classes => [{name => 'client', required => 1}]};
+with 'Net::Riak::Role::Base' => {
+ classes => [{ name => 'client', required => 1, }]
+};
has name => (
is => 'ro',
@@ -84,14 +85,14 @@ sub get_properties {
$params->{props} = 'true' unless exists $params->{props};
$params->{keys} = 'false' unless exists $params->{keys};
- my $request =
- $self->client->request('GET', [$self->client->prefix, $self->name],
- $params);
+ my $request = $self->client->new_request(
+ 'GET', [$self->client->prefix, $self->name], $params
+ );
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
- if (!$response->is_success) {
- die "Error getting bucket properties: " . $response->status_line . "\n";
+ unless ($response->is_success) {
+ die "Error getting bucket properties: ".$response->status_line."\n";
}
if ($params->{keys} ne 'stream') {
@@ -119,13 +120,16 @@ sub get_properties {
sub set_properties {
my ($self, $props) = @_;
- my $request = $self->client->request('PUT', [$self->client->prefix, $self->name]);
+ my $request = $self->client->new_request(
+ 'PUT', [$self->client->prefix, $self->name]
+ );
+
$request->header('Content-Type' => $self->content_type);
$request->content(JSON::encode_json({props => $props}));
- my $response = $self->client->useragent->request($request);
- if (!$response->is_success) {
- die "Error setting bucket properties: " . $response->status_line . "\n";
+ my $response = $self->client->send_request($request);
+ unless ($response->is_success) {
+ die "Error setting bucket properties: ".$response->status_line."\n";
}
}
diff --git a/lib/Net/Riak/Client.pm b/lib/Net/Riak/Client.pm
index 19d172f..e76a0ef 100644
--- a/lib/Net/Riak/Client.pm
+++ b/lib/Net/Riak/Client.pm
@@ -2,12 +2,10 @@ package Net::Riak::Client;
use Moose;
use MIME::Base64;
+use Moose::Util::TypeConstraints;
-with qw/
- Net::Riak::Role::REST
- Net::Riak::Role::UserAgent
- Net::Riak::Role::Hosts
- /;
+class_type 'HTTP::Request';
+class_type 'HTTP::Response';
has prefix => (
is => 'rw',
@@ -29,6 +27,24 @@ has client_id => (
isa => 'Str',
lazy_build => 1,
);
+has http_request => (
+ is => 'rw',
+ isa => 'HTTP::Request',
+);
+
+has http_response => (
+ is => 'rw',
+ isa => 'HTTP::Response',
+ handles => ['is_success']
+);
+
+with 'Net::Riak::Role::UserAgent';
+with qw/
+ Net::Riak::Role::REST
+ Net::Riak::Role::Hosts
+ /;
+
+
sub _build_client_id {
"perl_net_riak" . encode_base64(int(rand(10737411824)), '');
@@ -36,9 +52,9 @@ sub _build_client_id {
sub is_alive {
my $self = shift;
- my $request = $self->request('GET', ['ping']);
- my $response = $self->useragent->request($request);
- $response->is_success ? return 1 : return 0;
+ my $request = $self->new_request('GET', ['ping']);
+ my $response = $self->send_request($request);
+ $self->is_success ? return 1 : return 0;
}
1;
diff --git a/lib/Net/Riak/MapReduce.pm b/lib/Net/Riak/MapReduce.pm
index 632d484..14e4007 100644
--- a/lib/Net/Riak/MapReduce.pm
+++ b/lib/Net/Riak/MapReduce.pm
@@ -10,7 +10,7 @@ use Net::Riak::LinkPhase;
use Net::Riak::MapReducePhase;
with 'Net::Riak::Role::Base' =>
- {classes => [{name => 'client', required => 0}]};
+ {classes => [{name => 'client', required => 1}]};
has phases => (
traits => ['Array'],
@@ -110,7 +110,7 @@ sub map {
my $map_reduce = Net::Riak::MapReducePhase->new(
type => 'map',
function => $function,
- keep => $options{keep} || JSON::false,
+ keep => $options{keep} ? JSON::true : JSON::false,
arg => $options{arg} || [],
);
$self->add_phase($map_reduce);
@@ -166,13 +166,18 @@ sub run {
my $content = JSON::encode_json($job);
- my $request =
- $self->client->request('POST', [$self->client->mapred_prefix]);
+ my $request = $self->client->new_request(
+ 'POST', [$self->client->mapred_prefix]
+ );
$request->content($content);
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
+
+ unless ($response->is_success) {
+ die "MapReduce query failed: ".$response->status_line;
+ }
- my $result = JSON::decode_json($response->content);
+ my $result = JSON::decode_json($response->content);
if ( $timeout && ( $ua_timeout != $self->client->useragent->timeout() ) ) {
$self->client->useragent->timeout($ua_timeout);
@@ -200,17 +205,35 @@ sub run {
=head1 SYNOPSIS
+ use Net::Riak;
+
my $riak = Net::Riak->new( host => "http://10.0.0.127:8098/" );
- my $bucket = $riak->bucket("mybucket");
+ my $bucket = $riak->bucket("Cats");
+
+ my $query = $riak->add("Cats");
+ $query->map(
+ 'function(v, d, a) { return [v]; }',
+ arg => [qw/some params to your function/]
+ );
+
+ $query->reduce("function(v) { return [v];}");
+ my $json = $query->run(10000);
+
+ # can also be used like:
- my $mapred = $riak->add("mybucket");
- $mapred->map('function(v) { return [v]; }');
- $mapred->reduce("function(v) { return v;}");
- my $res = $mapred->run(10000);
+ my $query = Net::Riak::MapReduce->new(
+ client => $riak->client
+ );
+
+ # named functions
+ my $json = $query->add_bucket('Dogs')
+ ->map('Riak.mapValuesJson')
+ ->reduce('Your.SortFunction')
+ ->run;
=head1 DESCRIPTION
-The RiakMapReduce object allows you to build up and run a map/reduce operation on Riak.
+The MapReduce object allows you to build up and run a map/reduce operations on Riak.
=head2 ATTRIBUTES
@@ -226,11 +249,9 @@ The RiakMapReduce object allows you to build up and run a map/reduce operation o
=back
-=head2 METHODS
+=head1 METHODS
-=over 4
-
-=item add
+=head2 add
arguments: bucketname or arrays or L<Net::Riak::Object>
@@ -248,17 +269,17 @@ Add your inputs to a MapReduce job
$mapred->add( "alice", "p5" );
$mapred->add( $riak->bucket("alice")->get("p6") );
-=item add_object
+=head2 add_object
-=item add_bucket_key_data
+=head2 add_bucket_key_data
-=item add_bucket
+=head2 add_bucket
-=item link
+=head2 link
arguments: bucketname, tag, keep
-return: self
+return: $self
Add a link phase to the map/reduce operation.
@@ -268,43 +289,57 @@ The default value for tag is '_'.
The flag argument means to flag whether to keep results from this stage in the map/reduce. (default False, unless this is the last step in the phase)
-=item map
+=head2 map
-arguments: function, options
+arguments: $function, %options
return: self
+ ->map("function () {..}", keep => 0, args => ['foo', 'bar']);
+ ->map('Riak.mapValuesJson'); # de-serializes data into JSON
+
Add a map phase to the map/reduce operation.
functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
-options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+%options is an optional associative array containing:
-=item reduce
+ language
+ keep - flag
+ arg - an arrayref of parameterss for the JavaScript function
-arguments: function, options
+=head2 reduce
-return: self
+arguments: $function, %options
+
+return: $self
+
+ ->reduce("function () {..}", keep => 1, args => ['foo', 'bar']);
Add a reduce phase to the map/reduce operation.
functions is either a named javascript function (i: 'Riak.mapValues'), or an anonymous javascript function (ie: 'function(...) ....')
-options is an optional associative array containing 'languaga', 'keep' flag, and/or 'arg'
+=head2 run
-=item run
+arguments: $function, %options
-arguments: function, options
+arguments: $timeout
-arguments: timeout
+return: arrayref
-return: array
+Run the map/reduce operation and attempt to de-serialize the JSON response to a perl structure. rayref of RiakLink objects if the last phase is a link phase.
-Run the map/reduce operation. Returns an array of results, or an array of RiakLink objects if the last phase is a link phase.
+Timeout in milliseconds,
-Timeout in milliseconds.
+=head2 SEE ALSO
-=back
+REST API
-=cut
+https://wiki.basho.com/display/RIAK/MapReduce
+
+List of built-in named functions for map / reduce phases
+http://hg.basho.com/riak/src/tip/doc/js-mapreduce.org#cl-496
+
+=cut
diff --git a/lib/Net/Riak/Object.pm b/lib/Net/Riak/Object.pm
index 8b1d5d4..a012fbf 100644
--- a/lib/Net/Riak/Object.pm
+++ b/lib/Net/Riak/Object.pm
@@ -61,7 +61,7 @@ sub store {
my $params = {returnbody => 'true', w => $w, dw => $dw};
my $request =
- $self->client->request('PUT',
+ $self->client->new_request('PUT',
[$self->client->prefix, $self->bucket->name, $self->key], $params);
$request->header('X-Riak-ClientID' => $self->client->client_id);
@@ -82,7 +82,7 @@ sub store {
$request->content($self->data);
}
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
$self->populate($response, [200, 204, 300]);
$self;
}
@@ -98,10 +98,10 @@ sub load {
my $params = {r => $self->r};
my $request =
- $self->client->request('GET',
+ $self->client->new_request('GET',
[$self->client->prefix, $self->bucket->name, $self->key], $params);
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
$self->populate($response, [200, 300, 404]);
$self;
}
@@ -113,10 +113,10 @@ sub delete {
my $params = {dw => $dw};
my $request =
- $self->client->request('DELETE',
+ $self->client->new_request('DELETE',
[$self->client->prefix, $self->bucket->name, $self->key], $params);
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
$self->populate($response, [204, 404]);
$self;
}
@@ -205,9 +205,9 @@ sub sibling {
my $params = {r => $r, vtag => $vtag};
my $request =
- $self->client->request('GET',
+ $self->client->new_request('GET',
[$self->client->prefix, $self->bucket->name, $self->key], $params);
- my $response = $self->client->useragent->request($request);
+ my $response = $self->client->send_request($request);
my $obj = Net::Riak::Object->new(
client => $self->client,
diff --git a/lib/Net/Riak/Role/REST.pm b/lib/Net/Riak/Role/REST.pm
index 1a18ff7..136ea88 100644
--- a/lib/Net/Riak/Role/REST.pm
+++ b/lib/Net/Riak/Role/REST.pm
@@ -6,6 +6,10 @@ use URI;
use HTTP::Request;
use Moose::Role;
+requires 'http_request';
+requires 'http_response';
+requires 'useragent';
+
sub _build_path {
my ($self, $path) = @_;
$path = join('/', @$path);
@@ -20,10 +24,22 @@ sub _build_uri {
$uri;
}
-sub request {
+# constructs a HTTP::Request
+sub new_request {
my ($self, $method, $path, $params) = @_;
my $uri = $self->_build_uri($path, $params);
- HTTP::Request->new($method => $uri);
+ return HTTP::Request->new($method => $uri);
+}
+
+# makes a HTTP::Request returns and stores a HTTP::Response
+sub send_request {
+ my ($self, $req) = @_;
+
+ $self->http_request($req);
+ my $r = $self->useragent->request($req);
+ $self->http_response($r);
+
+ return $r;
}
1;