Skip to content

Instantly share code, notes, and snippets.

@snaga
Created November 11, 2014 09:45
Show Gist options
  • Select an option

  • Save snaga/1343510f158f1734ab4a to your computer and use it in GitHub Desktop.

Select an option

Save snaga/1343510f158f1734ab4a to your computer and use it in GitHub Desktop.
EzParallel.pm - A Simple Library for Parallel Processing
package EzParallel;
use strict;
use POSIX;
our $pidfile = ".ez.pids";
our $lockfile = ".ez.lock";
our $lock_fh;
sub _log {
my $self = shift;
my $msg = shift;
print "[" . getpid() . "] " . $msg . "\n";
}
sub new {
my $class = shift;
my $self = {
MaxProcs => 8,
@_,
};
return bless $self, $class;
}
sub init {
my $self = shift;
$self->_log("init");
open(PIDFILE, ">$pidfile") || die($!);
close(PIDFILE);
}
sub lock_pid {
my $self = shift;
$self->_log("lock_pid");
open(LOCKFILE, "> $lockfile") || die($!);
flock(LOCKFILE, 2);
$lock_fh = *LOCKFILE;
}
sub _get_num_procs {
my $self = shift;
$self->_log("_get_num_procs");
my @pids;
open(PIDFILE, "$pidfile") || die($!);
while(<PIDFILE>)
{
chomp;
push(@pids, $_);
}
close(PIDFILE);
my $len = @pids;
$len;
}
sub is_slot_available {
my $self = shift;
my $num_procs = $self->_get_num_procs;
if ($num_procs < $self->{MaxProcs})
{
return 1;
}
0;
}
sub add_pid {
my $self = shift;
my $mypid = shift;
$self->_log("add_pid $mypid");
open(PIDFILE, ">> $pidfile") || die($!);
print PIDFILE $mypid."\n";
close(PIDFILE);
$self->_log("add_pid done.");
}
sub remove_pid {
my $self = shift;
my $mypid = shift;
$self->_log("remove_pid $mypid");
open(PIDFILE, "$pidfile") || die($!);
my @pids;
while(<PIDFILE>)
{
chomp;
if ($_ != $mypid)
{
push(@pids, $_);
}
}
close(PIDFILE);
open(PIDFILE, "> $pidfile") || die($!);
foreach (@pids)
{
print PIDFILE $_."\n";
}
close(PIDFILE);
$self->_log("remove_pid done.");
}
sub unlock_pid {
my $self = shift;
$self->_log("unlock_pid");
close($lock_fh);
}
sub wait_all {
my $self = shift;
$self->_log("wait_all");
while (1)
{
$self->lock_pid;
my $numprocs = $self->_get_num_procs;
$self->unlock_pid;
$self->_log("num_procs = $numprocs, max_procs = $self->{MaxProcs}");
if ($numprocs == 0)
{
last;
}
sleep(1);
}
$self->_log("wait_all done.");
}
sub cleanup {
my $self = shift;
$self->_log("cleanup");
unlink($lockfile);
}
1;
=pod
=head1 NAME
EzParallel - A Simple Library for Parallel Processing
=head1 SYNOPSIS
use EzParallel;
$pa = EzParallel->new(MaxProcs => 8);
$pa->init;
# wait until a slot is avilable.
while(1)
{
$pa->lock_pid;
if ($pa->is_slot_available)
{
last;
}
$pa->unlock_pid;
sleep(1);
}
# ok. run a process for the slot.
$pid = fork();
if ($pid > 0)
{
# parent
$pa->unlock_pid;
}
else
{
# child
$pa->add_pid(getpid());
$pa->unlock_pid;
# do something here.
$pa->lock_pid;
$pa->remove_pid(getpid());
$pa->unlock_pid;
}
$pa->wait_all;
$pa->cleanup;
=head1 COPYRIGHT
Copyright(C) 2014 Satoshi Nagayasu
Copyright(C) 2014 Uptime Technologies, LLC.
=cut
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment