Created
December 10, 2013 13:04
-
-
Save wesyoung/7890281 to your computer and use it in GitHub Desktop.
CIF::Client::Plugin::ZeroMQ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package CIF::Client::Plugin::ZeroMQ; | |
| use 5.011; | |
| use warnings; | |
| use strict; | |
| use namespace::autoclean; | |
| use Moose; | |
| use MooseX::FollowPBP; | |
| use MooseX::Aliases; | |
| use ZMQx::Class; | |
| use ZMQ::LibZMQ3; | |
| use Try::Tiny; | |
| use Carp; | |
| use AnyEvent; | |
| with 'CIF::Client::Plugin'; | |
| use constant RE_URI => qr/^(zeromq|zmq)(\+(tcp|inproc|ipc|proc))?\:\/{2}([[\S]+|\*])(\:(\d+))?$/; | |
| has 'socket' => ( | |
| is => 'rw', | |
| isa => 'ZMQx::Class::Socket', | |
| ); | |
| sub understands { | |
| my $self = shift; | |
| my $args = shift; | |
| return 1 if($args->{'driver'} && $args->{'driver'} =~ RE_URI()); | |
| return 1 if($args->{'uri'} && $args->{'uri'} =~ RE_URI()); | |
| return 0; | |
| } | |
| around BUILDARGS => sub { | |
| my $orig = shift; | |
| my $self = shift; | |
| my %args = @_; | |
| if($args{'uri'}){ | |
| $args{'uri'} =~ s/^(zeromq|zmq)\+?//g; | |
| } | |
| return $self->$orig(%args); | |
| }; | |
| sub BUILD { | |
| my $self = shift; | |
| $self->set_socket( | |
| ZMQx::Class->socket( | |
| 'REQ', | |
| connect => $self->get_uri(), | |
| ) | |
| ); | |
| } | |
| sub _register { } | |
| sub _unregister { } | |
| # this should already be a string by the time it hits us | |
| sub send { | |
| my $self = shift; | |
| my $data = shift; | |
| my $ret = $self->get_socket->send($data); | |
| return 0 unless($ret); | |
| # setup this function in a test script | |
| # now wait for reply | |
| say 'waiting on reply...'; | |
| $ret = $self->get_socket->receive('blocking'); | |
| return $ret; | |
| } | |
| sub receieve { | |
| my $self = shift; | |
| my $args = shift; | |
| return $self->get_socket->receive($args->{'blocking'}); | |
| } | |
| sub shutdown { | |
| my $self = shift; | |
| $self->{'socket'} = undef; | |
| return 1; | |
| } | |
| __PACKAGE__->meta->make_immutable(); | |
| 1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment