Skip to content

Instantly share code, notes, and snippets.

@kazeburo
Created May 7, 2010 14:28
Show Gist options
  • Save kazeburo/393484 to your computer and use it in GitHub Desktop.
Save kazeburo/393484 to your computer and use it in GitHub Desktop.
#!/usr/bin/perl
package Worker;
use strict;
use warnings;
use AnyEvent;
use AnyEvent::Curl;
use Coro;
use Coro::Channel;
use Coro::AnyEvent;
use Time::HiRes qw//;
my $UA_WORKER_CORO_DESC_IN_WORK = "ua_worker_coro_desc_in_work";
my $UA_WORKER_CORO_DESC_NOT_WORK = "ua_worker_coro_desc_not_work";
my $MAX_UA_WORKER = 100;
my $MAX_REQ_PER_WORKER = 200;
our $CURL;
sub new {
my $class = shift;
bless {
dead_worker => 0,
max_ua_worker => $MAX_UA_WORKER,
max_req_per_worker => $MAX_REQ_PER_WORKER,
channel => Coro::Channel->new,
}, $class;
}
sub in_work {
my $self = shift;
"$self" . '/' . $UA_WORKER_CORO_DESC_IN_WORK;
}
sub not_work {
my $self = shift;
"$self" . '/' . $UA_WORKER_CORO_DESC_NOT_WORK;
}
sub busy_worker {
my $self = shift;
no warnings;
scalar( grep { $_->desc eq $self->in_work } Coro::State::list );
}
sub idle_worker {
my $self = shift;
no warnings;
scalar( grep { $_->desc eq $self->not_work } Coro::State::list );
}
sub dead_worker {
my $self = shift;
no warnings;
my $dead_worker = $self->{max_ua_worker} -
scalar( grep { $_->desc eq $self->in_work || $_->desc eq $self->not_work } Coro::State::list );
$dead_worker;
}
# worker threads
sub run_worker {
my $self = shift;
my $ua = $CURL ||= AnyEvent::Curl->new();
$ua->start;
async {
$Coro::current->desc( $self->not_work );
my $max_req = $self->{max_req_per_worker} - int( $self->{max_req_per_worker} * 0.1 );
while( $max_req ) {
my $req = $self->{channel}->get();
$Coro::current->desc( $self->in_work );
my $start = Time::HiRes::time;
my $cv = $ua->add('http://127.0.0.1:5000/' );
my $res = $cv->recv;
my $end = Time::HiRes::time - $start;
# warn sprintf "[%s] end req: %s ", $req->[0], $end if $end > 0.02;
$req->[1]->send(1);
$Coro::current->desc( $self->not_work );
--$max_req;
}
};
}
sub build {
my $class = shift;
my $self = $class->new;
warn "[$$] build";
for ( my $i=0; $i < $self->{max_ua_worker}; $i++ ) {
$self->run_worker();
}
$self->{worker_timer} = AE::timer 0.5, 0.5, sub {
my $dead_worker = $self->dead_worker;
for ( my $i=0; $i < $dead_worker; $i++ ) {
# warn "[$$] respwan";
$self->run_worker();
}
};
$self->{monitor_timer} = AE::timer 1, 1, sub {
warn sprintf "max:%d busy:%d idle:%d dead:%d",
$self->{max_ua_worker},
$self->busy_worker,
$self->idle_worker,
$self->dead_worker;
};
$self;
}
sub put {
my $self = shift;
$self->{channel}->put( @_ );
}
package main;
use strict;
use warnings;
use AnyEvent;
my $worker = Worker->build();
for ( 1..500 ) {
my $g_cv = AE::cv;
my $start = Time::HiRes::time;
for my $i ( 1..2000 ) {
$g_cv->begin;
my $cv = AE::cv;
$worker->put([ $i, $cv ]);
$cv->cb(sub {
my $res = shift->recv;
$g_cv->end;
});
}
#printf "queue: %s\n", Time::HiRes::time - $start;
$g_cv->recv;
printf "end: %s\n", Time::HiRes::time - $start;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment