diff options
| author | Robin Edwards <robin.ge@gmail.com> | 2011-04-20 14:38:43 +0100 |
|---|---|---|
| committer | Robin Edwards <robin.ge@gmail.com> | 2011-04-20 14:38:43 +0100 |
| commit | 79bea382fd2c0753ca9ace79a11bb74c9a1d722b (patch) | |
| tree | bde42a47792a27e0a863ee527b88c8c24258f7e9 /lib/Net/Riak/Transport/PBC/Message.pm | |
| parent | Merge remote branch 'simon/fix_link_encoding' (diff) | |
| download | net-riak-79bea382fd2c0753ca9ace79a11bb74c9a1d722b.tar.gz | |
merged pbc branch to master
Diffstat (limited to 'lib/Net/Riak/Transport/PBC/Message.pm')
| -rw-r--r-- | lib/Net/Riak/Transport/PBC/Message.pm | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/lib/Net/Riak/Transport/PBC/Message.pm b/lib/Net/Riak/Transport/PBC/Message.pm new file mode 100644 index 0000000..75170de --- /dev/null +++ b/lib/Net/Riak/Transport/PBC/Message.pm @@ -0,0 +1,121 @@ +package Net::Riak::Transport::PBC::Message; + +use Moose; +use MooseX::Types::Moose qw/Str HashRef Int/; +use Net::Riak::Types 'Socket'; +use Net::Riak::Transport::PBC::Code qw/ + REQ_CODE EXPECTED_RESP RESP_CLASS RESP_DECODER/; +use Net::Riak::Transport::PBC::Transport; + +has socket => ( + is => 'rw', + isa => Socket, + predicate => 'has_socket', +); + +has request => ( + isa => 'Str', + is => 'ro', + lazy_build => 1, +); + +has request_code => ( + required => 1, + isa => Int, + is => 'ro', + lazy_build => 1, +); + +has message_type => ( + required => 1, + isa => Str, + is => 'ro', + trigger => sub { + $_[0]->{message_type} = 'Rpb'.$_[1]; + } +); + +has params => ( + is => 'ro', + isa => HashRef, +); + +sub _build_request_code { + my $self = shift; + return REQ_CODE($self->message_type); +} + +sub _build_request { + my $self = shift; + $self->_pack_request( $self->request_code, $self->encode ); +} + +sub _pack_request { + my ($self, $code, $req) = @_; + my $h = pack('c', $code) . $req; + use bytes; + my $len = length $h; + return pack('N',$len).$h; +} + +sub encode { + my $self = shift; + return $self->message_type->can('encode') + ? $self->message_type->encode( $self->params ) + : ''; +} + +sub decode { + my ($self, $type, $raw_content) = @_; + return 'Rpb'.$type->decode($raw_content); +} + +sub send { + my ($self, $cb) = @_; + + die "No socket? did you forget to ->connect?" unless $self->has_socket; + + $self->socket->print($self->request); + + my $resp = $self->handle_response; + + return $resp unless $cb; + + $cb->($resp); + while (!$resp->done) { + $resp = $self->handle_response; +# use YAML::Syck; warn Dump $resp; + $cb->($resp); + } + return 1; +} + +sub handle_response { + my $self = shift; + my ($code, $resp) = $self->_unpack_response; + + my $expected_code = EXPECTED_RESP($self->request_code); + + if ($expected_code != $code) { + # TODO throw object + die "Expecting response type " + . RESP_CLASS($expected_code) + . " got " . RESP_CLASS($code); + } + + return 1 unless RESP_DECODER($code); + return RESP_DECODER($code)->decode($resp); +} + +sub _unpack_response { + my $self = shift; + my ( $len, $code, $msg ); + $self->socket->read( $len, 4 ); + $len = unpack( 'N', $len ); + $self->socket->read( $code, 1 ); + $code = unpack( 'c', $code ); + $self->socket->read( $msg, $len - 1 ); + return ( $code, $msg ); +} + +1; |
