summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/Role/PBC
diff options
context:
space:
mode:
authorRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
committerRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
commit79bea382fd2c0753ca9ace79a11bb74c9a1d722b (patch)
treebde42a47792a27e0a863ee527b88c8c24258f7e9 /lib/Net/Riak/Role/PBC
parentMerge remote branch 'simon/fix_link_encoding' (diff)
downloadnet-riak-79bea382fd2c0753ca9ace79a11bb74c9a1d722b.tar.gz
merged pbc branch to master
Diffstat (limited to '')
-rw-r--r--lib/Net/Riak/Role/PBC.pm78
-rw-r--r--lib/Net/Riak/Role/PBC/Bucket.pm46
-rw-r--r--lib/Net/Riak/Role/PBC/Link.pm35
-rw-r--r--lib/Net/Riak/Role/PBC/MapReduce.pm37
-rw-r--r--lib/Net/Riak/Role/PBC/Message.pm21
-rw-r--r--lib/Net/Riak/Role/PBC/Object.pm131
6 files changed, 348 insertions, 0 deletions
diff --git a/lib/Net/Riak/Role/PBC.pm b/lib/Net/Riak/Role/PBC.pm
new file mode 100644
index 0000000..605f032
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC.pm
@@ -0,0 +1,78 @@
+package Net::Riak::Role::PBC;
+
+use Moose::Role;
+use MooseX::Types::Moose qw/Str Int/;
+
+with qw(
+ Net::Riak::Role::PBC::Message
+ Net::Riak::Role::PBC::Bucket
+ Net::Riak::Role::PBC::MapReduce
+ Net::Riak::Role::PBC::Link
+ Net::Riak::Role::PBC::Object);
+
+use Net::Riak::Types 'Socket';
+use IO::Socket::INET;
+
+has [qw/r w dw/] => (
+ is => 'rw',
+ isa => Int,
+ default => 2
+);
+
+has host => (
+ is => 'ro',
+ isa => Str,
+ required => 1,
+);
+
+has port => (
+ is => 'ro',
+ isa => Int,
+ required => 1,
+);
+
+has socket => (
+ is => 'rw',
+ isa => Socket,
+ predicate => 'has_socket',
+);
+
+sub is_alive {
+ my $self = shift;
+ return $self->send_message('PingReq');
+}
+
+sub connected {
+ my $self = shift;
+ return $self->has_socket && $self->socket->connected ? 1 : 0;
+}
+
+sub connect {
+ my $self = shift;
+ return if $self->has_socket && $self->connected;
+
+ $self->socket(
+ IO::Socket::INET->new(
+ PeerAddr => $self->host,
+ PeerPort => $self->port,
+ Proto => 'tcp',
+ Timeout => 30,
+ )
+ );
+}
+
+sub all_buckets {
+ my $self = shift;
+ my $resp = $self->send_message('ListBucketsReq');
+ return ref ($resp->buckets) eq 'ARRAY' ? @{$resp->buckets} : ();
+}
+
+sub server_info {
+ my $self = shift;
+ my $resp = $self->send_message('GetServerInfoReq');
+ return $resp;
+}
+
+sub stats { die "->stats is only avaliable through the REST interface" }
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Bucket.pm b/lib/Net/Riak/Role/PBC/Bucket.pm
new file mode 100644
index 0000000..aa7d7fa
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Bucket.pm
@@ -0,0 +1,46 @@
+package Net::Riak::Role::PBC::Bucket;
+
+use Moose::Role;
+use Data::Dumper;
+
+sub get_properties {
+ my ( $self, $name, $params ) = @_;
+ my $resp = $self->send_message( GetBucketReq => { bucket => $name } );
+ return { props => { %{ $resp->props } } };
+}
+
+sub set_properties {
+ my ( $self, $bucket, $props ) = @_;
+ return $self->send_message(
+ SetBucketReq => {
+ bucket => $bucket->name,
+ props => $props
+ }
+ );
+}
+
+sub get_keys {
+ my ( $self, $name, $params) = @_;
+ my $keys = [];
+
+ my $res = $self->send_message(
+ ListKeysReq => { bucket => $name, },
+ sub {
+ if ( defined $_[0]->keys ) {
+ if ($params->{cb}) {
+ $params->{cb}->($_) for @{ $_[0]->keys };
+ }
+ else {
+ push @$keys, @{ $_[0]->keys };
+ }
+ }
+ }
+ );
+
+ return $params->{cb} ? undef : $keys;
+}
+
+
+
+1;
+
diff --git a/lib/Net/Riak/Role/PBC/Link.pm b/lib/Net/Riak/Role/PBC/Link.pm
new file mode 100644
index 0000000..5e6a336
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Link.pm
@@ -0,0 +1,35 @@
+package Net::Riak::Role::PBC::Link;
+use Moose::Role;
+use Net::Riak::Link;
+use Net::Riak::Bucket;
+
+sub _populate_links {
+ my ($self, $object, $links) = @_;
+
+ for my $link (@$links) {
+ my $l = Net::Riak::Link->new(
+ bucket => Net::Riak::Bucket->new(
+ name => $link->bucket,
+ client => $self
+ ),
+ key => $link->key,
+ tag => $link->tag
+ );
+ $object->add_link($l);
+ }
+}
+
+sub _links_for_message {
+ my ($self, $object) = @_;
+
+ return [
+ map { {
+ tag => $_->tag,
+ key => $_->key,
+ bucket => $_->bucket->name
+ }
+ } $object->all_links
+ ]
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/MapReduce.pm b/lib/Net/Riak/Role/PBC/MapReduce.pm
new file mode 100644
index 0000000..afeabe8
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/MapReduce.pm
@@ -0,0 +1,37 @@
+package Net::Riak::Role::PBC::MapReduce;
+use Moose::Role;
+use JSON;
+use List::Util 'sum';
+use Data::Dump 'pp';
+
+sub execute_job {
+ my ($self, $job, $timeout, $returned_phases) = @_;
+
+ $job->{timeout} = $timeout;
+
+ my $job_request = JSON::encode_json($job);
+
+ my $results;
+
+ my $resp = $self->send_message( MapRedReq => {
+ request => $job_request,
+ content_type => 'application/json'
+ }, sub { push @$results, $self->decode_phase(shift) })
+ or
+ die "MapReduce query failed!";
+
+
+ return $returned_phases == 1 ? $results->[0] : $results;
+}
+
+sub decode_phase {
+ my ($self, $resp) = @_;
+
+ if (defined $resp->response && length($resp->response)) {
+ return JSON::decode_json($resp->response);
+ }
+
+ return;
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Message.pm b/lib/Net/Riak/Role/PBC/Message.pm
new file mode 100644
index 0000000..0c2fbf3
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Message.pm
@@ -0,0 +1,21 @@
+package Net::Riak::Role::PBC::Message;
+
+use Moose::Role;
+use Net::Riak::Transport::PBC::Message;
+
+sub send_message {
+ my ( $self, $type, $params, $cb ) = @_;
+
+ $self->connect unless $self->connected;
+
+ my $message = Net::Riak::Transport::PBC::Message->new(
+ message_type => $type,
+ params => $params || {},
+ );
+
+ $message->socket( $self->socket );
+
+ return $message->send($cb);
+}
+
+1;
diff --git a/lib/Net/Riak/Role/PBC/Object.pm b/lib/Net/Riak/Role/PBC/Object.pm
new file mode 100644
index 0000000..847cac2
--- /dev/null
+++ b/lib/Net/Riak/Role/PBC/Object.pm
@@ -0,0 +1,131 @@
+package Net::Riak::Role::PBC::Object;
+
+use JSON;
+use Moose::Role;
+use Data::Dumper;
+use List::Util 'first';
+
+sub store_object {
+ my ($self, $w, $dw, $object) = @_;
+
+ my $value = (ref $object->data && $object->content_type eq 'application/json')
+ ? JSON::encode_json($object->data) : $object->data;
+
+ my $content = {
+ content_type => $object->content_type,
+ value => $value,
+ usermeta => undef
+ };
+
+ if ($object->has_links) {
+ $content->{links} = $self->_links_for_message($object);
+ }
+
+ $self->send_message(
+ PutReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ content => $content,
+ }
+ );
+ return $object;
+}
+
+sub load_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $resp = $self->send_message(
+ GetReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ r => $params->{r},
+ }
+ );
+
+ $self->populate_object($object, $resp);
+
+ return $object;
+}
+
+sub delete_object {
+ my ( $self, $params, $object ) = @_;
+
+ my $resp = $self->send_message(
+ DelReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ rw => $params->{dw},
+ }
+ );
+
+ $object;
+}
+
+sub populate_object {
+ my ( $self, $object, $resp) = @_;
+
+ $object->_clear_links;
+ $object->exists(0);
+
+ if ( $resp->content && scalar (@{$resp->content}) > 1) {
+ my %seen;
+ my @vtags = grep { !$seen{$_}++ } map { $_->vtag } @{$resp->content};
+ $object->siblings(\@vtags);
+ }
+
+ my $content = $resp->content ? $resp->content->[0] : undef;
+
+ return unless $content and $resp->vclock;
+
+ $object->vclock($resp->vclock);
+ $object->vtag($content->vtag);
+ $object->content_type($content->content_type);
+
+ if($content->links) {
+ $self->_populate_links($object, $content->links);
+ }
+
+ my $data = ($object->content_type eq 'application/json')
+ ? JSON::decode_json($content->value) : $content->value;
+
+ $object->exists(1);
+
+ $object->data($data);
+}
+
+
+# This emulates the behavior of the existing REST client.
+sub retrieve_sibling {
+ my ($self, $object, $params) = @_;
+
+ my $resp = $self->send_message(
+ GetReq => {
+ bucket => $object->bucket->name,
+ key => $object->key,
+ r => $params->{r},
+ }
+ );
+
+ # hack for loading 1 sibling
+ if ($params->{vtag}) {
+ $resp->{content} = [
+ first {
+ $_->vtag eq $params->{vtag}
+ } @{$resp->content}
+ ];
+ }
+
+ my $sibling = Net::Riak::Object->new(
+ client => $self,
+ bucket => $object->bucket,
+ key => $object->key
+ );
+
+ $sibling->_jsonize($object->_jsonize);
+
+ $self->populate_object($sibling, $resp);
+
+ $sibling;
+}
+
+1;