Created
November 3, 2010 15:36
-
-
Save wchristian/661230 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
# $Id: parallel.pl,v 1.7 2008/05/06 20:41:33 dk Exp $ | |
# | |
# This example fetches two pages in parallel, one with http/1.0 another with | |
# http/1.1 . The idea is to demonstrate three different ways of doing so, by | |
# using object API, and explicit and implicit loop unrolling | |
# | |
use strict; | |
use warnings; | |
BEGIN { | |
#$ENV{IO_LAMBDA_DEBUG} = "http"; # uncomment this to see that it indeed goes parallel | |
} | |
use IO::Lambda qw(:lambda); | |
use IO::Lambda::HTTP qw(http_request); | |
use LWP::ConnCache; | |
use List::Util qw( max min ); | |
use HTTP::Request::Common 'POST'; | |
my @ids = ( 34..100 ); | |
my @chain = map POST( "http://eve-metrics.com/api/item.json", [ type_ids => $_ ] ), @ids; | |
my @results = parallel_download( \@chain, 10 ); | |
exit; | |
sub parallel_download { | |
my ( $queue, $asked_worker_max ) = @_; | |
my $lambda = lambda \&manager; | |
my $real_worker_max = sanitize_worker_max( $queue, $asked_worker_max ); | |
my @responses = $lambda->wait( $queue, $real_worker_max ); | |
return @responses; | |
} | |
sub sanitize_worker_max { | |
my ( $queue, $asked_max ) = @_; | |
die "max should be 0 or more" if $asked_max < 0; | |
my $queue_size = @{ $queue }; | |
return $queue_size if !$asked_max; # 0 = as many parallel as possible | |
return $queue_size if $asked_max > $queue_size; # not more than the request count | |
return $asked_max; | |
} | |
sub manager { | |
my ( $queue, $worker_max ) = @_; | |
my @q = @{ $queue }; | |
my $queue_size = @q; | |
my %counts = ( | |
queue_size => $queue_size, | |
worker_count => $worker_max, | |
); | |
my @workers = map lambda( \&worker ), 1 .. $worker_max; | |
$_->call( \@q, \%counts ) for @workers; | |
context @workers; | |
return tails undef; | |
} | |
sub worker { | |
my ( $q, $counts ) = @_; | |
return if !@$q; | |
queue_new_req( \@$q ); | |
my @ret; | |
return tail { | |
my @res = @_; | |
push @ret, @res; | |
print scalar(@$q)."/$counts->{queue_size} on queue, done with ".$_->request->uri."\n" for @res; | |
if( @$q ) { | |
queue_new_req( \@$q ); | |
again; | |
return; | |
} | |
$counts->{worker_count}--; | |
print "worker done, $counts->{worker_count} left running\n"; | |
return @ret; | |
}; | |
} | |
sub queue_new_req { | |
my ( $reqs ) = @_; | |
my $req = shift @$reqs; | |
$req = IO::Lambda::HTTP->new( $req ); | |
context $req; | |
return; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment