Last active
August 29, 2015 14:18
-
-
Save nnathan/c84e28299d0f8861ae91 to your computer and use it in GitHub Desktop.
Simple driver for Net::Async::AMQP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env perl | |
| # Inspired by one of my favourite Canadian cryptographer Tom St. Denis: | |
| # | |
| # This code is hereby placed in the Public Domain: | |
| # | |
| # DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE | |
| # Version 2, December 2004 | |
| # | |
| # -- nnathan | |
| # --------------------------------------------- | |
| # Prelude | |
| # --------------------------------------------- | |
| package NN::RabbitMQ; | |
| use strict; | |
| use warnings; | |
| # don't import functions because they will collide | |
| # on compress() and uncompress(), instead we'll | |
| # call using the fully qualified name | |
| use Compress::Snappy (); | |
| use Compress::Zlib (); | |
| use Net::Async::AMQP; | |
| use IO::Async::Loop; | |
| use Carp; | |
| #use Carp::Always; | |
| # --------------------------------------------- | |
| # Introduction & Usage Notes | |
| # --------------------------------------------- | |
| # | |
| # This is a very simple interface to RabbitMQ. | |
| # | |
| # Although it IO::Async asynchronous framework, to the outside world | |
| # it should be completely synchronous/blocking (with reasonable timeouts). | |
| # | |
| # For anyone hacking on this code, get familiar with `perldoc Future` | |
| # since the entire IO::Async framework including Net::Async::AMQP | |
| # are _all_ Future objects. | |
| # | |
| # The following is a brief example of how to use this class: | |
| # | |
| # Note: eval wraps publish/consume opereations which may call die() on failure | |
| # (e.g. unable to connect to queue server, setting $@ with error | |
| # message) | |
| # | |
| # # (1) [setup connection params and specify queue] | |
| # my $queue_name = 'test'; | |
| # my $queue_ctx = BP::RabbitMQ->new(queue_name => $queue_name); | |
| # | |
| # # (2) [publish to queue] | |
| # my $payload = "hello world"; | |
| # my $sent = eval { $queue_ctx->publish(payload => $payload); }; | |
| # | |
| # if (defined $sent) | |
| # { | |
| # print "[x] sent message '$payload'\n"; | |
| # } | |
| # else | |
| # { | |
| # print STDERR "publish failed, exception: $@"; | |
| # } | |
| # | |
| # # (3) [consume a single message from queue] | |
| # my $msg = eval { $queue_ctx->consume; }; | |
| # if (defined $msg) | |
| # { | |
| # print "[X] consumed '$msg'"; | |
| # } | |
| # else | |
| # { | |
| # print STDERR "consume from queue='$queue_name' failed, exception: $@"; | |
| # } | |
| # | |
| # (1) setup a queue context (see configure() subroutine for available options) | |
| # | |
| # (2) connect, declare durable queue 'test', publish $payload, disconnect | |
| # (if successful returns 1 otherwise undef) | |
| # | |
| # (3) connect, declare durable queue 'test', consume $msg, disconnect | |
| # (if successful returns $msg otherwise undef) | |
| # | |
| # --------------------------------------------- | |
| # Code | |
| # --------------------------------------------- | |
| # internal logger | |
| sub dumb_log | |
| { | |
| my $e = shift; | |
| chomp $e; | |
| print STDERR $e."\n"; | |
| } | |
| # compression / decompression routines | |
| sub compress_string | |
| { | |
| my $payload = shift; | |
| my $compression_type = shift; | |
| my $compression_level = shift; | |
| my $compressed_payload = undef; | |
| if (lc($compression_type) eq 'zlib') | |
| { | |
| $compressed_payload = Compress::Zlib::compress($payload, $compression_level); | |
| } | |
| elsif (lc($compression_type) eq 'snappy') | |
| { | |
| $compressed_payload = Compress::Snappy::compress($payload); | |
| } | |
| else | |
| { | |
| die("compress_string: bad compressioon type: $compression_type"); | |
| } | |
| return $compressed_payload; | |
| } | |
| sub decompress_string | |
| { | |
| my $payload = shift; | |
| my $compression_type = shift; | |
| my $decompressed_payload = undef; | |
| if (lc($compression_type) eq 'zlib') | |
| { | |
| $decompressed_payload = Compress::Zlib::uncompress($payload); | |
| } | |
| elsif (lc($compression_type) eq 'snappy') | |
| { | |
| $decompressed_payload = Compress::Snappy::decompress($payload); | |
| } | |
| else | |
| { | |
| die("compress_string: bad compressioon type: $compression_type"); | |
| } | |
| return $decompressed_payload; | |
| } | |
| # new(...) [constructor] | |
| # | |
| # Required Arguments: | |
| # | |
| # queue_name => <str> | |
| # | |
| # Arguments: | |
| # | |
| # See configure() subroutine for default arguments | |
| # | |
| # Usage: | |
| # | |
| # $queue_ctx = BP::RabbitMQ->new(queue_name => 'test', vhost => '/foo'); | |
| # | |
| sub new | |
| { | |
| my ($self, %args) = @_; | |
| my $log_prefix = (caller(0))[3]; | |
| my $context = {}; | |
| if (!defined $args{queue_name}) | |
| { | |
| dumb_log("$log_prefix: queue_name parameter not specified."); | |
| } | |
| # some perl OO magic nonsense: | |
| # for more info check out: http://stackoverflow.com/a/1703658 | |
| bless $context, $self; | |
| $context->configure(%args); | |
| return $context; | |
| } | |
| # configure(...): | |
| # | |
| # Description: | |
| # Configures connection parameters for connecting to rabbitmq. | |
| # | |
| # Default arguments: | |
| # queue_name => # [whatever was set by new()] | |
| # exchange => '' # exchange | |
| # durable => 1 # declares queue durable by default | |
| # confirm_mode => 1 # enable acknowledgements from server to client | |
| # # when publishing a message | |
| # expiration => undef # disables message expiration by default | |
| # # (can explicitly specify expiration to push()) | |
| # compression => 0 # enables (de)compression of payload when push/pop | |
| # # (useful for >128KB messages that might be fragmented by AMQP) | |
| # compression_type => 'zlib' # selects compression algorithm: choices are 'snappy' and 'zlib' | |
| # compression_level => 6 # selects compression level (if algorithm supports it) | |
| # host => 'localhost' | |
| # port => 5672 | |
| # user => 'guest' | |
| # pass => 'guest' | |
| # vhost => '/' | |
| # timeout => 20 | |
| # debug => 0 | |
| # | |
| # Usage: | |
| # $queue_ctx = BP::RabbitMQ->new; | |
| # $queue_ctx->configure(timeout => 5, debug => 1); | |
| # | |
| sub configure | |
| { | |
| my ($self, %args) = @_; | |
| if (!defined $self->{_is_configured}) | |
| { | |
| # if first time calling configure, then we set defaults | |
| $self->{_is_configured} = 1; | |
| # internal variables (look weird way to declare arrays but this is perl) | |
| $self->{_notifiers} = []; | |
| # defaults are specified here | |
| $self->{queue_name} = $args{queue_name} // $self->{queue_name}; | |
| $self->{exchange} = $args{exchange} // ''; | |
| $self->{durable} = $args{durable} // 1; | |
| $self->{confirm_mode} = $args{confirm_mode} // 1; | |
| $self->{expiration} = $args{expiration} // undef; | |
| $self->{compression} = $args{compression} // 0; | |
| $self->{compression_type} = $args{compression_type} // 'snappy'; | |
| $self->{compression_level} = $args{compression_level} // 6; | |
| $self->{host} = $args{host} // 'localhost'; | |
| $self->{port} = $args{port} // 5672; | |
| $self->{user} = $args{user} // 'guest'; | |
| $self->{pass} = $args{pass} // 'guest'; | |
| $self->{vhost} = $args{vhost} // '/'; | |
| $self->{timeout} = $args{timeout} // 20; | |
| $self->{debug} = $args{debug} // 0; | |
| } | |
| else | |
| { | |
| $self->{queue_name} = $args{queue_name} // $self->{queue_name}; | |
| $self->{exchange} = $args{exchange} // $self->{exchange}; | |
| $self->{durable} = $args{durable} // $self->{durable}; | |
| $self->{confirm_mode} = $args{confirm_mode} // $self->{confirm_mode}; | |
| $self->{expiration} = $args{expiration} // $self->{expiration}; | |
| $self->{compression} = $args{compression} // $self->{compression}; | |
| $self->{compression_type} = $args{compression_type} // $self->{compression_type}; | |
| $self->{compression_level} = $args{compression_level} // $self->{compression_level}; | |
| $self->{host} = $args{host} // $self->{host}; | |
| $self->{port} = $args{port} // $self->{port}; | |
| $self->{user} = $args{user} // $self->{user}; | |
| $self->{pass} = $args{pass} // $self->{pass}; | |
| $self->{vhost} = $args{vhost} // $self->{vhost}; | |
| $self->{timeout} = $args{timeout} // $self->{timeout}; | |
| $self->{debug} = $args{debug} // $self->{debug}; | |
| } | |
| } | |
| sub remove_notifiers | |
| { | |
| my $log_prefix = (caller(0))[3]; | |
| my $loop = IO::Async::Loop->new; | |
| dumb_log("$log_prefix: number of loop notifiers (before removes): ".scalar($loop->notifiers)); | |
| my @notifiers = $loop->notifiers; | |
| foreach my $notifier (@notifiers) | |
| { | |
| eval { $notifier->remove_from_parent; }; | |
| } | |
| dumb_log("$log_prefix: number of loop notifiers (after removes): ".scalar($loop->notifiers)); | |
| } | |
| # consume(...): | |
| # | |
| # Description: | |
| # 1. establish connection to rabbitmq broker | |
| # 2. establish channel | |
| # 3. set QOS and set prefetch_count=1 | |
| # 4. declare queue and options as specified by configure() | |
| # 5. consume from queue with no_ack=0 (i.e. require message acknowledgement) | |
| # * on receipt of first message | |
| # - acknowledge if delivery_tag==1 (i.e. first message received) | |
| # - close connection (any unacknowledged messages will return to queue for redelivery) | |
| # | |
| # (timeout if (1)-(5) doesn't complete in $conf->{timeout} seconds) | |
| # | |
| # returns message payload if succeeds, otherwise undef. | |
| # | |
| # Usage: | |
| # $queue_ctx = BP::RabbitMQ->new(queue_name => 'test'); | |
| # my $msg = eval { $queue_ctx->consume; }; | |
| # if (defined $msg) | |
| # { | |
| # print "[X] consumed '$msg'"; | |
| # } | |
| # else | |
| # { | |
| # print STDERR "consume from queue='$queue_name' failed, exception: $@"; | |
| # } | |
| sub consume | |
| { | |
| my ($self, %args) = @_; | |
| # perl builtin magic to get the packagename::functioname as a string | |
| my $log_prefix = (caller(0))[3]; | |
| # return value (populated once we receive a message from queue) | |
| my $payload = undef; | |
| # setup context | |
| my $loop = IO::Async::Loop->new; | |
| dumb_log("$log_prefix: number of loop notifiers (before add(amqp)): ".scalar($loop->notifiers)); | |
| # the following is an interim hack to get things working | |
| # we shouldn't really add/remove notifiers but instead | |
| # setp the loop once, and only do a publish/consume | |
| while (scalar(@{$self->{_notifiers}}) > 0) | |
| { | |
| my $notifier = pop @{$self->{_notifiers}}; | |
| $notifier->remove_from_parent; | |
| } | |
| dumb_log("$log_prefix: after removes ".scalar($loop->notifiers)); | |
| my $amqp = Net::Async::AMQP->new; | |
| $loop->add($amqp); | |
| push @{$self->{_notifiers}}, $amqp; | |
| dumb_log("$log_prefix: number of loop notifiers (after add(amqp)): ".scalar($loop->notifiers)); | |
| my $conn = $amqp->connect( | |
| # { host => $self->{host}, ..., etc. } | |
| (map {$_ => $self->{$_}} qw(host port user pass vhost)), | |
| )->get; | |
| my $ch = $amqp->open_channel->get; | |
| $ch->qos( | |
| prefetch_count => 1, | |
| )->get; | |
| my $q = $ch->queue_declare( | |
| queue => $self->{queue_name}, | |
| durable => $self->{durable}, | |
| )->get; | |
| my $received = $loop->new_future; | |
| $q->listen( | |
| channel => $ch, | |
| ack => 1, # ack=1 requires messages be ack'd otherwise they will be redelivered | |
| # by upstream queue when the consumer session closes | |
| )->on_ready( | |
| sub | |
| { | |
| $ch->bus->subscribe_to_event( | |
| message => sub | |
| { | |
| my ($ev, $type, $message_payload, $consumer_tag, $delivery_tag, $routing_key) = @_; | |
| dumb_log("$log_prefix: consumed message [queue=$self->{queue_name} delivery_tag=$delivery_tag]"); | |
| # only ack and cancel on first message, timeout will take care of the rest | |
| if ($delivery_tag == 1) | |
| { | |
| $ch->ack( | |
| delivery_tag => $delivery_tag, | |
| multiple => 0, | |
| ); | |
| # the following code to cancel consuming is no longer needed; | |
| # shutting down the entire connection will do an orderly | |
| # shutdown of the consumer and the channel. | |
| # $q->cancel( | |
| # consumer_tag => $consumer_tag, | |
| # )->on_ready( | |
| # sub | |
| # { | |
| # my ($q, $ctag) = @_; | |
| # print "Queue $q ctag $ctag cancelled\n"; | |
| # } | |
| # )->get; | |
| $amqp->close->get; | |
| if ($self->{compression}) | |
| { | |
| $payload = decompress_string($message_payload, $self->{compression_type}); | |
| } | |
| else | |
| { | |
| $payload = $message_payload; | |
| } | |
| $received->done; | |
| } | |
| } | |
| ); | |
| } | |
| )->get; | |
| my $timeout_future = $loop->timeout_future(after => $self->{timeout}); | |
| Future->wait_any($received, $timeout_future)->get; | |
| return $payload; | |
| } | |
| # publish(...): | |
| # | |
| # Description: | |
| # 1. establish connection to rabbitmq broker | |
| # 2. establish channel | |
| # 3. declare queue and options as specified by configure() | |
| # 4. publish to queue with options specified by configure() | |
| # 5. close connection | |
| # | |
| # (timeout if (1)-(5) doesn't complete in $conf->{timeout} seconds) | |
| # | |
| # returns 1 if succeeds, otherwise undef. | |
| # | |
| # Required arguments: | |
| # payload => "message..." | |
| # | |
| # Optional arguments: | |
| # confirm_mode => 1 # require queue to acknowledge message is published | |
| # | |
| # expiration => 0 # message expiry in queue (units are ms, e.g. 300s = 300000) | |
| # # [currently unsupported since Net::Async::AMQP doesn't support it yet] | |
| # | |
| # Usage: | |
| # my $queue_ctx = BP::RabbitMQ->new(queue_name => 'test'); | |
| # my $payload = "hello world"; | |
| # my $sent = eval { $queue_ctx->publish(payload => $payload); }; | |
| # | |
| # if (!defined $sent) | |
| # { | |
| # print STDERR "publish failed, exception: $@"; | |
| # } | |
| # else | |
| # { | |
| # print "[x] sent message '$payload'\n"; | |
| # } | |
| # | |
| sub publish | |
| { | |
| # perl builtin magic to get the packagename::functioname as a string | |
| my $log_prefix = (caller(0))[3]; | |
| # argument validation / sanity check | |
| my ($self, %args) = @_; | |
| if (!defined $args{payload}) | |
| { | |
| dumb_log("$log_prefix: payload parameter not defined."); | |
| return; | |
| } | |
| # return parameter (set to 1 if successful) | |
| my $payload_sent = undef; | |
| # parameters for publishing | |
| my $confirm_mode = $args{confirm_mode} || $self->{confirm_mode}; | |
| my $expiration = $args{expiration} || $self->{expiration}; | |
| # setup context | |
| my $loop = IO::Async::Loop->new; | |
| dumb_log("$log_prefix: number of loop notifiers (before add(amqp)): ".scalar($loop->notifiers)); | |
| # the following is an interim hack to get things working | |
| # we shouldn't really add/remove notifiers but instead | |
| # setp the loop once, and only do a publish/consume | |
| while (scalar(@{$self->{_notifiers}}) > 0) | |
| { | |
| my $notifier = pop @{$self->{_notifiers}}; | |
| $notifier->remove_from_parent; | |
| } | |
| dumb_log("$log_prefix: after removes ".scalar($loop->notifiers)); | |
| my $amqp = Net::Async::AMQP->new; | |
| $loop->add($amqp); | |
| push @{$self->{_notifiers}}, $amqp; | |
| my $conn = $amqp->connect( | |
| # { host => $self->{host}, ..., etc. } | |
| (map {$_ => $self->{$_}} qw(host port user pass vhost)), | |
| exchange => '', | |
| )->get; | |
| my $ch = $amqp->open_channel->get; | |
| my $q = $ch->queue_declare( | |
| queue => $self->{queue_name}, | |
| durable => $self->{durable}, | |
| )->get; | |
| if ($confirm_mode) | |
| { | |
| $ch->confirm_mode->get; | |
| } | |
| my $payload = undef; | |
| if ($self->{compression}) | |
| { | |
| $payload = compress_string($args{payload}, $self->{compression_type}, $self->{compression_level}); | |
| } | |
| else | |
| { | |
| $payload = $args{payload}; | |
| } | |
| my $sent = $loop->new_future; | |
| $ch->publish( | |
| exchange => $self->{exchange}, | |
| routing_key => $self->{queue_name}, | |
| payload => $payload, | |
| # expiration param not yet allowed by Net::Async::AMQP | |
| #expiration => $expiration | |
| )->on_ready( | |
| sub | |
| { | |
| $payload_sent = 1; | |
| $amqp->close->get; | |
| $sent->done; | |
| } | |
| ); | |
| my $timeout_future = $loop->timeout_future(after => $self->{timeout}); | |
| Future->wait_any($sent, $timeout_future)->get; | |
| return $payload_sent; | |
| } | |
| 1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment