Created
April 7, 2011 22:42
-
-
Save stevan/908941 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!perl | |
use 5.10.0; | |
use strict; | |
use warnings; | |
use Data::Dumper; | |
use ZeroMQ qw/:all/; | |
my $coord_addr = 'tcp://127.0.0.1:5566'; | |
my $publisher_addr = 'tcp://127.0.0.1:5567'; | |
my $cxt = ZeroMQ::Context->new; | |
say "welcome to my client ..."; | |
my $coordinator = $cxt->socket(ZMQ_REQ); | |
$coordinator->connect($coord_addr); | |
say "coordinator bound to $coord_addr"; | |
my $subscriber = $cxt->socket(ZMQ_SUB); | |
$subscriber->connect($publisher_addr); | |
say "subscriber bound to $publisher_addr"; | |
while (1) { | |
print "enter a command:"; | |
my $command = <>; | |
chomp $command; | |
say "sending $command to server"; | |
$coordinator->send($command); | |
say "waiting for subscription filter ..."; | |
my $filter = $coordinator->recv->data; | |
say "Got back subscription filter=($filter)"; | |
if ($filter eq lc 'unknown command') { | |
say "Got 'unknown command' error"; | |
next; | |
} | |
$subscriber->setsockopt(ZMQ_SUBSCRIBE, $filter); | |
say "subscription filter applied to subscriber"; | |
say "sending acknowledgement to coordinator to start"; | |
$coordinator->send(''); | |
say "starting subscription ..."; | |
my $data = $subscriber->recv->data; | |
say "got $data"; | |
$data =~ s/^$filter //; | |
while ( $data ) { | |
$data = $subscriber->recv->data; | |
say "got $data"; | |
$data =~ s/^$filter //; | |
sleep(1); | |
} | |
say "subscription done ..."; | |
say "grabbing acknowledgement from coordinator that subscription is complete"; | |
$coordinator->recv; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!perl | |
use 5.10.0; | |
use strict; | |
use warnings; | |
use Data::Dumper; | |
use Data::UUID; | |
use ZeroMQ qw/:all/; | |
my $coord_addr = 'tcp://127.0.0.1:5566'; | |
my $publisher_addr = 'tcp://127.0.0.1:5567'; | |
say "Welcome to my server ...."; | |
my $cxt = ZeroMQ::Context->new; | |
my $coordinator = $cxt->socket(ZMQ_REP); | |
$coordinator->bind($coord_addr); | |
say "coordinator bound to $coord_addr"; | |
my $publisher = $cxt->socket(ZMQ_PUB); | |
$publisher->bind($publisher_addr); | |
say "publisher bound to $publisher_addr"; | |
while (1) { | |
my $msg = $coordinator->recv; | |
if ($msg) { | |
my $value = $msg->data; | |
say "Got command=($value) on coordinator ..."; | |
if ( lc $value eq 'select' ) { | |
say "processing select ..."; | |
my $uuid = Data::UUID->new->create_str; | |
say "sending subscription filter=($uuid)"; | |
$coordinator->send( $uuid ); | |
say "awaiting confirmation from subscriber ..."; | |
if ( $coordinator->recv ) { | |
say "got confirmation from subscriber, starting to publish ..."; | |
foreach my $i ( 0 .. 10 ) { | |
my $msg = "$uuid [ $i ]"; | |
say "sending $msg"; | |
$publisher->send($msg); | |
} | |
say "publishing completed, sending empty value to subscriber ..."; | |
$publisher->send("$uuid "); | |
} | |
say "select command completed, notify the coordinator"; | |
$coordinator->send(''); | |
} | |
else { | |
say "unrecognized command ..."; | |
$coordinator->send( "unknown command" ); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment