summaryrefslogtreecommitdiff
path: root/lib/Net/Riak/Transport/PBC/Message.pm
diff options
context:
space:
mode:
authorRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
committerRobin Edwards <robin.ge@gmail.com>2011-04-20 14:38:43 +0100
commit79bea382fd2c0753ca9ace79a11bb74c9a1d722b (patch)
treebde42a47792a27e0a863ee527b88c8c24258f7e9 /lib/Net/Riak/Transport/PBC/Message.pm
parentMerge remote branch 'simon/fix_link_encoding' (diff)
downloadnet-riak-79bea382fd2c0753ca9ace79a11bb74c9a1d722b.tar.gz
merged pbc branch to master
Diffstat (limited to 'lib/Net/Riak/Transport/PBC/Message.pm')
-rw-r--r--lib/Net/Riak/Transport/PBC/Message.pm121
1 files changed, 121 insertions, 0 deletions
diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm
new file mode 100644
index 0000000..75170de
--- /dev/null
+++ b/lib/Net/Riak/Transport/PBC/Message.pm
@@ -0,0 +1,121 @@
+package Net::Riak::Transport::PBC::Message;
+
+use Moose;
+use MooseX::Types::Moose qw/Str HashRef Int/;
+use Net::Riak::Types 'Socket';
+use Net::Riak::Transport::PBC::Code qw/
+ REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/;
+use Net::Riak::Transport::PBC::Transport;
+
+has socket => (
+ is => 'rw',
+ isa => Socket,
+ predicate => 'has_socket',
+);
+
+has request => (
+ isa => 'Str',
+ is => 'ro',
+ lazy_build => 1,
+);
+
+has request_code => (
+ required => 1,
+ isa => Int,
+ is => 'ro',
+ lazy_build => 1,
+);
+
+has message_type => (
+ required => 1,
+ isa => Str,
+ is => 'ro',
+ trigger => sub {
+ $_[0]->{message_type} = 'Rpb'.$_[1];
+ }
+);
+
+has params => (
+ is => 'ro',
+ isa => HashRef,
+);
+
+sub _build_request_code {
+ my $self = shift;
+ return REQ_CODE($self->message_type);
+}
+
+sub _build_request {
+ my $self = shift;
+ $self->_pack_request( $self->request_code, $self->encode );
+}
+
+sub _pack_request {
+ my ($self, $code, $req) = @_;
+ my $h = pack('c', $code) . $req;
+ use bytes;
+ my $len = length $h;
+ return pack('N',$len).$h;
+}
+
+sub encode {
+ my $self = shift;
+ return $self->message_type->can('encode')
+ ? $self->message_type->encode( $self->params )
+ : '';
+}
+
+sub decode {
+ my ($self, $type, $raw_content) = @_;
+ return 'Rpb'.$type->decode($raw_content);
+}
+
+sub send {
+ my ($self, $cb) = @_;
+
+ die "No socket? did you forget to ->connect?" unless $self->has_socket;
+
+ $self->socket->print($self->request);
+
+ my $resp = $self->handle_response;
+
+ return $resp unless $cb;
+
+ $cb->($resp);
+ while (!$resp->done) {
+ $resp = $self->handle_response;
+# use YAML::Syck; warn Dump $resp;
+ $cb->($resp);
+ }
+ return 1;
+}
+
+sub handle_response {
+ my $self = shift;
+ my ($code, $resp) = $self->_unpack_response;
+
+ my $expected_code = EXPECTED_RESP($self->request_code);
+
+ if ($expected_code != $code) {
+ # TODO throw object
+ die "Expecting response type "
+ . RESP_CLASS($expected_code)
+ . " got " . RESP_CLASS($code);
+ }
+
+ return 1 unless RESP_DECODER($code);
+ return RESP_DECODER($code)->decode($resp);
+}
+
+sub _unpack_response {
+ my $self = shift;
+ my ( $len, $code, $msg );
+ $self->socket->read( $len, 4 );
+ $len = unpack( 'N', $len );
+ $self->socket->read( $code, 1 );
+ $code = unpack( 'c', $code );
+ $self->socket->read( $msg, $len - 1 );
+ return ( $code, $msg );
+}
+
+1;