Skip to content

Instantly share code, notes, and snippets.

@wchristian
Created November 3, 2010 15:36
Show Gist options
  • Save wchristian/661230 to your computer and use it in GitHub Desktop.
Save wchristian/661230 to your computer and use it in GitHub Desktop.
#!/usr/bin/perl
# $Id: parallel.pl,v 1.7 2008/05/06 20:41:33 dk Exp $
#
# This example fetches two pages in parallel, one with http/1.0 another with
# http/1.1 . The idea is to demonstrate three different ways of doing so, by
# using object API, and explicit and implicit loop unrolling
#
use strict;
use warnings;
BEGIN {
#$ENV{IO_LAMBDA_DEBUG} = "http"; # uncomment this to see that it indeed goes parallel
}
use IO::Lambda qw(:lambda);
use IO::Lambda::HTTP qw(http_request);
use LWP::ConnCache;
use List::Util qw( max min );
use HTTP::Request::Common 'POST';
my @ids = ( 34..100 );
my @chain = map POST( "http://eve-metrics.com/api/item.json", [ type_ids => $_ ] ), @ids;
my @results = parallel_download( \@chain, 10 );
exit;
sub parallel_download {
my ( $queue, $asked_worker_max ) = @_;
my $lambda = lambda \&manager;
my $real_worker_max = sanitize_worker_max( $queue, $asked_worker_max );
my @responses = $lambda->wait( $queue, $real_worker_max );
return @responses;
}
sub sanitize_worker_max {
my ( $queue, $asked_max ) = @_;
die "max should be 0 or more" if $asked_max < 0;
my $queue_size = @{ $queue };
return $queue_size if !$asked_max; # 0 = as many parallel as possible
return $queue_size if $asked_max > $queue_size; # not more than the request count
return $asked_max;
}
sub manager {
my ( $queue, $worker_max ) = @_;
my @q = @{ $queue };
my $queue_size = @q;
my %counts = (
queue_size => $queue_size,
worker_count => $worker_max,
);
my @workers = map lambda( \&worker ), 1 .. $worker_max;
$_->call( \@q, \%counts ) for @workers;
context @workers;
return tails undef;
}
sub worker {
my ( $q, $counts ) = @_;
return if !@$q;
queue_new_req( \@$q );
my @ret;
return tail {
my @res = @_;
push @ret, @res;
print scalar(@$q)."/$counts->{queue_size} on queue, done with ".$_->request->uri."\n" for @res;
if( @$q ) {
queue_new_req( \@$q );
again;
return;
}
$counts->{worker_count}--;
print "worker done, $counts->{worker_count} left running\n";
return @ret;
};
}
sub queue_new_req {
my ( $reqs ) = @_;
my $req = shift @$reqs;
$req = IO::Lambda::HTTP->new( $req );
context $req;
return;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment