Created
January 18, 2014 18:58
-
-
Save xaprb/8494656 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
use strict; | |
use warnings FATAL => 'all'; | |
use DBI; | |
use English qw(-no_match_vars); | |
use Getopt::Long; | |
use List::Util qw(max); | |
our $VERSION = '@VERSION@'; | |
our $DISTRIB = '@DISTRIB@'; | |
our $SVN_REV = sprintf("%d", q$Revision: 765 $ =~ m/(\d+)/g); | |
# ############################################################################ | |
# Get configuration information. | |
# ############################################################################ | |
# Define cmdline args; each is GetOpt::Long spec, whether required, | |
# human-readable description. Add more hash entries as needed. | |
my @opt_spec = ( | |
{ s => 'database|D=s', d => 'Database to use' }, | |
{ s => 'defaults-file|F=s', d => 'Only read default options from the given file' }, | |
{ s => 'host|h=s', d => 'Connect to host' }, | |
{ s => 'help', d => 'Show this help message' }, | |
{ s => 'password|p=s', d => 'Password to use when connecting' }, | |
{ s => 'port|P=i', d => 'Port number to use for connection' }, | |
{ s => 'socket|S=s', d => 'Socket file to use for connection' }, | |
{ s => 'user|u=s', d => 'User for login if not current user' }, | |
{ s => 'version', d => 'Output version information and exit' }, | |
{ s => 'mode=s', d => 'Mode: producer or consumer (p/c)' }, | |
); | |
# This is the container for the command-line options' values to be stored in | |
# after processing. Initial values are defaults. | |
my %opts; | |
# Post-process... | |
my %opt_seen; | |
foreach my $spec ( @opt_spec ) { | |
my ( $long, $short ) = $spec->{s} =~ m/^([\w-]+)(?:\|([^!+=]*))?/; | |
$spec->{k} = $short || $long; | |
$spec->{l} = $long; | |
$spec->{t} = $short; | |
$spec->{n} = $spec->{s} =~ m/!/; | |
$opts{$spec->{k}} = undef unless defined $opts{$spec->{k}}; | |
die "Duplicate option $spec->{k}" if $opt_seen{$spec->{k}}++; | |
} | |
Getopt::Long::Configure('no_ignore_case', 'bundling'); | |
GetOptions( map { $_->{s} => \$opts{$_->{k}} } @opt_spec) or $opts{help} = 1; | |
if ( $opts{version} ) { | |
print "$PROGRAM_NAME Ver $VERSION Distrib $DISTRIB Changeset $SVN_REV\n"; | |
exit(0); | |
} | |
$opts{help} = 1 unless $opts{mode}; | |
# If a filename or other argument(s) is required after the other arguments, | |
# add "|| !@ARGV" inside the parens on the next line. | |
if ( $opts{help} ) { | |
print "Usage: $PROGRAM_NAME <options> --mode <mode>\n\n"; | |
my $maxw = max(map { length($_->{l}) + ($_->{n} ? 4 : 0)} @opt_spec); | |
foreach my $spec ( sort { $a->{l} cmp $b->{l} } @opt_spec ) { | |
my $long = $spec->{n} ? "[no]$spec->{l}" : $spec->{l}; | |
my $short = $spec->{t} ? "-$spec->{t}" : ''; | |
printf(" --%-${maxw}s %-4s %s\n", $long, $short, $spec->{d}); | |
} | |
print <<USAGE; | |
$PROGRAM_NAME produces messages and puts them into test.messages table. | |
If possible, database options are read from your .my.cnf file. | |
For more details, please read the documentation: | |
perldoc $PROGRAM_NAME | |
USAGE | |
exit(1); | |
} | |
# ############################################################################ | |
# Get ready to do the main work. | |
# ############################################################################ | |
my %conn = ( | |
F => 'mysql_read_default_file', | |
h => 'host', | |
P => 'port', | |
S => 'mysql_socket' | |
); | |
# Connect to the database | |
my $dsn = 'DBI:mysql:' . ( $opts{D} || '' ) . ';' | |
. join(';', map { "$conn{$_}=$opts{$_}" } grep { defined $opts{$_} } qw(F h P S)) | |
. ';mysql_read_default_group=mysql'; | |
my $dbh = DBI->connect($dsn, @opts{qw(u p)}, { AutoCommit => 1, RaiseError => 1, PrintError => 0 } ); | |
my $ins = $dbh->prepare('INSERT INTO test.messages(message) values (?)'); | |
my $sel = $dbh->prepare('SELECT * FROM test.messages WHERE id > ?'); | |
my $get = 'SELECT GET_LOCK("test.messages", ?)'; | |
my $rel = 'SELECT RELEASE_LOCK("test.messages")'; | |
# ############################################################################ | |
# PRODUCER CODE | |
# ############################################################################ | |
my $lock_time = 1_000_000; | |
if ( $opts{mode} eq 'p' ) { | |
print "Starting up in producer mode\n"; | |
my $msg; | |
do { | |
my $lck = $dbh->selectall_arrayref($get, {}, $lock_time)->[0]->[0]; | |
print "Enter a one-line message:\n"; | |
chomp($msg = <STDIN>); | |
if ( $msg ) { | |
$ins->execute($msg); | |
} | |
$dbh->do($rel); | |
} while ($msg); | |
} | |
# ############################################################################ | |
# CONSUMER CODE | |
# ############################################################################ | |
else { | |
print "Starting up in consumer mode\n"; | |
my $last_row = 0; | |
my $got_lock = 1; | |
while ( 1 ) { | |
$got_lock = $dbh->selectall_arrayref($get, {}, $lock_time)->[0]->[0]; | |
if ( $got_lock ) { | |
# I got the lock, can go ahead and consume | |
$sel->execute($last_row); | |
my @rows = @{$sel->fetchall_arrayref({})}; | |
foreach my $row ( @rows ) { | |
print "Message: $row->{message}\n"; | |
$last_row = $row->{id}; | |
} | |
} | |
$dbh->do($rel); | |
# $got_lock could be undefined (I already had the lock, there's no | |
# producer) or 0 (I timed out). | |
if ( !defined $got_lock ) { | |
# Since there's no producer, I need to sleep a bit and allow the | |
# producer a chance to get a lock | |
sleep(10); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment