Skip to content

Instantly share code, notes, and snippets.

@nnathan
Last active August 29, 2015 14:18
Show Gist options
  • Select an option

  • Save nnathan/c84e28299d0f8861ae91 to your computer and use it in GitHub Desktop.

Select an option

Save nnathan/c84e28299d0f8861ae91 to your computer and use it in GitHub Desktop.
Simple driver for Net::Async::AMQP
#!/usr/bin/env perl
# Inspired by one of my favourite Canadian cryptographer Tom St. Denis:
#
# This code is hereby placed in the Public Domain:
#
# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
# Version 2, December 2004
#
# -- nnathan
# ---------------------------------------------
# Prelude
# ---------------------------------------------
package NN::RabbitMQ;
use strict;
use warnings;
# don't import functions because they will collide
# on compress() and uncompress(), instead we'll
# call using the fully qualified name
use Compress::Snappy ();
use Compress::Zlib ();
use Net::Async::AMQP;
use IO::Async::Loop;
use Carp;
#use Carp::Always;
# ---------------------------------------------
# Introduction & Usage Notes
# ---------------------------------------------
#
# This is a very simple interface to RabbitMQ.
#
# Although it IO::Async asynchronous framework, to the outside world
# it should be completely synchronous/blocking (with reasonable timeouts).
#
# For anyone hacking on this code, get familiar with `perldoc Future`
# since the entire IO::Async framework including Net::Async::AMQP
# are _all_ Future objects.
#
# The following is a brief example of how to use this class:
#
# Note: eval wraps publish/consume opereations which may call die() on failure
# (e.g. unable to connect to queue server, setting $@ with error
# message)
#
# # (1) [setup connection params and specify queue]
# my $queue_name = 'test';
# my $queue_ctx = BP::RabbitMQ->new(queue_name => $queue_name);
#
# # (2) [publish to queue]
# my $payload = "hello world";
# my $sent = eval { $queue_ctx->publish(payload => $payload); };
#
# if (defined $sent)
# {
# print "[x] sent message '$payload'\n";
# }
# else
# {
# print STDERR "publish failed, exception: $@";
# }
#
# # (3) [consume a single message from queue]
# my $msg = eval { $queue_ctx->consume; };
# if (defined $msg)
# {
# print "[X] consumed '$msg'";
# }
# else
# {
# print STDERR "consume from queue='$queue_name' failed, exception: $@";
# }
#
# (1) setup a queue context (see configure() subroutine for available options)
#
# (2) connect, declare durable queue 'test', publish $payload, disconnect
# (if successful returns 1 otherwise undef)
#
# (3) connect, declare durable queue 'test', consume $msg, disconnect
# (if successful returns $msg otherwise undef)
#
# ---------------------------------------------
# Code
# ---------------------------------------------
# internal logger
sub dumb_log
{
my $e = shift;
chomp $e;
print STDERR $e."\n";
}
# compression / decompression routines
sub compress_string
{
my $payload = shift;
my $compression_type = shift;
my $compression_level = shift;
my $compressed_payload = undef;
if (lc($compression_type) eq 'zlib')
{
$compressed_payload = Compress::Zlib::compress($payload, $compression_level);
}
elsif (lc($compression_type) eq 'snappy')
{
$compressed_payload = Compress::Snappy::compress($payload);
}
else
{
die("compress_string: bad compressioon type: $compression_type");
}
return $compressed_payload;
}
sub decompress_string
{
my $payload = shift;
my $compression_type = shift;
my $decompressed_payload = undef;
if (lc($compression_type) eq 'zlib')
{
$decompressed_payload = Compress::Zlib::uncompress($payload);
}
elsif (lc($compression_type) eq 'snappy')
{
$decompressed_payload = Compress::Snappy::decompress($payload);
}
else
{
die("compress_string: bad compressioon type: $compression_type");
}
return $decompressed_payload;
}
# new(...) [constructor]
#
# Required Arguments:
#
# queue_name => <str>
#
# Arguments:
#
# See configure() subroutine for default arguments
#
# Usage:
#
# $queue_ctx = BP::RabbitMQ->new(queue_name => 'test', vhost => '/foo');
#
sub new
{
my ($self, %args) = @_;
my $log_prefix = (caller(0))[3];
my $context = {};
if (!defined $args{queue_name})
{
dumb_log("$log_prefix: queue_name parameter not specified.");
}
# some perl OO magic nonsense:
# for more info check out: http://stackoverflow.com/a/1703658
bless $context, $self;
$context->configure(%args);
return $context;
}
# configure(...):
#
# Description:
# Configures connection parameters for connecting to rabbitmq.
#
# Default arguments:
# queue_name => # [whatever was set by new()]
# exchange => '' # exchange
# durable => 1 # declares queue durable by default
# confirm_mode => 1 # enable acknowledgements from server to client
# # when publishing a message
# expiration => undef # disables message expiration by default
# # (can explicitly specify expiration to push())
# compression => 0 # enables (de)compression of payload when push/pop
# # (useful for >128KB messages that might be fragmented by AMQP)
# compression_type => 'zlib' # selects compression algorithm: choices are 'snappy' and 'zlib'
# compression_level => 6 # selects compression level (if algorithm supports it)
# host => 'localhost'
# port => 5672
# user => 'guest'
# pass => 'guest'
# vhost => '/'
# timeout => 20
# debug => 0
#
# Usage:
# $queue_ctx = BP::RabbitMQ->new;
# $queue_ctx->configure(timeout => 5, debug => 1);
#
sub configure
{
my ($self, %args) = @_;
if (!defined $self->{_is_configured})
{
# if first time calling configure, then we set defaults
$self->{_is_configured} = 1;
# internal variables (look weird way to declare arrays but this is perl)
$self->{_notifiers} = [];
# defaults are specified here
$self->{queue_name} = $args{queue_name} // $self->{queue_name};
$self->{exchange} = $args{exchange} // '';
$self->{durable} = $args{durable} // 1;
$self->{confirm_mode} = $args{confirm_mode} // 1;
$self->{expiration} = $args{expiration} // undef;
$self->{compression} = $args{compression} // 0;
$self->{compression_type} = $args{compression_type} // 'snappy';
$self->{compression_level} = $args{compression_level} // 6;
$self->{host} = $args{host} // 'localhost';
$self->{port} = $args{port} // 5672;
$self->{user} = $args{user} // 'guest';
$self->{pass} = $args{pass} // 'guest';
$self->{vhost} = $args{vhost} // '/';
$self->{timeout} = $args{timeout} // 20;
$self->{debug} = $args{debug} // 0;
}
else
{
$self->{queue_name} = $args{queue_name} // $self->{queue_name};
$self->{exchange} = $args{exchange} // $self->{exchange};
$self->{durable} = $args{durable} // $self->{durable};
$self->{confirm_mode} = $args{confirm_mode} // $self->{confirm_mode};
$self->{expiration} = $args{expiration} // $self->{expiration};
$self->{compression} = $args{compression} // $self->{compression};
$self->{compression_type} = $args{compression_type} // $self->{compression_type};
$self->{compression_level} = $args{compression_level} // $self->{compression_level};
$self->{host} = $args{host} // $self->{host};
$self->{port} = $args{port} // $self->{port};
$self->{user} = $args{user} // $self->{user};
$self->{pass} = $args{pass} // $self->{pass};
$self->{vhost} = $args{vhost} // $self->{vhost};
$self->{timeout} = $args{timeout} // $self->{timeout};
$self->{debug} = $args{debug} // $self->{debug};
}
}
sub remove_notifiers
{
my $log_prefix = (caller(0))[3];
my $loop = IO::Async::Loop->new;
dumb_log("$log_prefix: number of loop notifiers (before removes): ".scalar($loop->notifiers));
my @notifiers = $loop->notifiers;
foreach my $notifier (@notifiers)
{
eval { $notifier->remove_from_parent; };
}
dumb_log("$log_prefix: number of loop notifiers (after removes): ".scalar($loop->notifiers));
}
# consume(...):
#
# Description:
# 1. establish connection to rabbitmq broker
# 2. establish channel
# 3. set QOS and set prefetch_count=1
# 4. declare queue and options as specified by configure()
# 5. consume from queue with no_ack=0 (i.e. require message acknowledgement)
# * on receipt of first message
# - acknowledge if delivery_tag==1 (i.e. first message received)
# - close connection (any unacknowledged messages will return to queue for redelivery)
#
# (timeout if (1)-(5) doesn't complete in $conf->{timeout} seconds)
#
# returns message payload if succeeds, otherwise undef.
#
# Usage:
# $queue_ctx = BP::RabbitMQ->new(queue_name => 'test');
# my $msg = eval { $queue_ctx->consume; };
# if (defined $msg)
# {
# print "[X] consumed '$msg'";
# }
# else
# {
# print STDERR "consume from queue='$queue_name' failed, exception: $@";
# }
sub consume
{
my ($self, %args) = @_;
# perl builtin magic to get the packagename::functioname as a string
my $log_prefix = (caller(0))[3];
# return value (populated once we receive a message from queue)
my $payload = undef;
# setup context
my $loop = IO::Async::Loop->new;
dumb_log("$log_prefix: number of loop notifiers (before add(amqp)): ".scalar($loop->notifiers));
# the following is an interim hack to get things working
# we shouldn't really add/remove notifiers but instead
# setp the loop once, and only do a publish/consume
while (scalar(@{$self->{_notifiers}}) > 0)
{
my $notifier = pop @{$self->{_notifiers}};
$notifier->remove_from_parent;
}
dumb_log("$log_prefix: after removes ".scalar($loop->notifiers));
my $amqp = Net::Async::AMQP->new;
$loop->add($amqp);
push @{$self->{_notifiers}}, $amqp;
dumb_log("$log_prefix: number of loop notifiers (after add(amqp)): ".scalar($loop->notifiers));
my $conn = $amqp->connect(
# { host => $self->{host}, ..., etc. }
(map {$_ => $self->{$_}} qw(host port user pass vhost)),
)->get;
my $ch = $amqp->open_channel->get;
$ch->qos(
prefetch_count => 1,
)->get;
my $q = $ch->queue_declare(
queue => $self->{queue_name},
durable => $self->{durable},
)->get;
my $received = $loop->new_future;
$q->listen(
channel => $ch,
ack => 1, # ack=1 requires messages be ack'd otherwise they will be redelivered
# by upstream queue when the consumer session closes
)->on_ready(
sub
{
$ch->bus->subscribe_to_event(
message => sub
{
my ($ev, $type, $message_payload, $consumer_tag, $delivery_tag, $routing_key) = @_;
dumb_log("$log_prefix: consumed message [queue=$self->{queue_name} delivery_tag=$delivery_tag]");
# only ack and cancel on first message, timeout will take care of the rest
if ($delivery_tag == 1)
{
$ch->ack(
delivery_tag => $delivery_tag,
multiple => 0,
);
# the following code to cancel consuming is no longer needed;
# shutting down the entire connection will do an orderly
# shutdown of the consumer and the channel.
# $q->cancel(
# consumer_tag => $consumer_tag,
# )->on_ready(
# sub
# {
# my ($q, $ctag) = @_;
# print "Queue $q ctag $ctag cancelled\n";
# }
# )->get;
$amqp->close->get;
if ($self->{compression})
{
$payload = decompress_string($message_payload, $self->{compression_type});
}
else
{
$payload = $message_payload;
}
$received->done;
}
}
);
}
)->get;
my $timeout_future = $loop->timeout_future(after => $self->{timeout});
Future->wait_any($received, $timeout_future)->get;
return $payload;
}
# publish(...):
#
# Description:
# 1. establish connection to rabbitmq broker
# 2. establish channel
# 3. declare queue and options as specified by configure()
# 4. publish to queue with options specified by configure()
# 5. close connection
#
# (timeout if (1)-(5) doesn't complete in $conf->{timeout} seconds)
#
# returns 1 if succeeds, otherwise undef.
#
# Required arguments:
# payload => "message..."
#
# Optional arguments:
# confirm_mode => 1 # require queue to acknowledge message is published
#
# expiration => 0 # message expiry in queue (units are ms, e.g. 300s = 300000)
# # [currently unsupported since Net::Async::AMQP doesn't support it yet]
#
# Usage:
# my $queue_ctx = BP::RabbitMQ->new(queue_name => 'test');
# my $payload = "hello world";
# my $sent = eval { $queue_ctx->publish(payload => $payload); };
#
# if (!defined $sent)
# {
# print STDERR "publish failed, exception: $@";
# }
# else
# {
# print "[x] sent message '$payload'\n";
# }
#
sub publish
{
# perl builtin magic to get the packagename::functioname as a string
my $log_prefix = (caller(0))[3];
# argument validation / sanity check
my ($self, %args) = @_;
if (!defined $args{payload})
{
dumb_log("$log_prefix: payload parameter not defined.");
return;
}
# return parameter (set to 1 if successful)
my $payload_sent = undef;
# parameters for publishing
my $confirm_mode = $args{confirm_mode} || $self->{confirm_mode};
my $expiration = $args{expiration} || $self->{expiration};
# setup context
my $loop = IO::Async::Loop->new;
dumb_log("$log_prefix: number of loop notifiers (before add(amqp)): ".scalar($loop->notifiers));
# the following is an interim hack to get things working
# we shouldn't really add/remove notifiers but instead
# setp the loop once, and only do a publish/consume
while (scalar(@{$self->{_notifiers}}) > 0)
{
my $notifier = pop @{$self->{_notifiers}};
$notifier->remove_from_parent;
}
dumb_log("$log_prefix: after removes ".scalar($loop->notifiers));
my $amqp = Net::Async::AMQP->new;
$loop->add($amqp);
push @{$self->{_notifiers}}, $amqp;
my $conn = $amqp->connect(
# { host => $self->{host}, ..., etc. }
(map {$_ => $self->{$_}} qw(host port user pass vhost)),
exchange => '',
)->get;
my $ch = $amqp->open_channel->get;
my $q = $ch->queue_declare(
queue => $self->{queue_name},
durable => $self->{durable},
)->get;
if ($confirm_mode)
{
$ch->confirm_mode->get;
}
my $payload = undef;
if ($self->{compression})
{
$payload = compress_string($args{payload}, $self->{compression_type}, $self->{compression_level});
}
else
{
$payload = $args{payload};
}
my $sent = $loop->new_future;
$ch->publish(
exchange => $self->{exchange},
routing_key => $self->{queue_name},
payload => $payload,
# expiration param not yet allowed by Net::Async::AMQP
#expiration => $expiration
)->on_ready(
sub
{
$payload_sent = 1;
$amqp->close->get;
$sent->done;
}
);
my $timeout_future = $loop->timeout_future(after => $self->{timeout});
Future->wait_any($sent, $timeout_future)->get;
return $payload_sent;
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment