Created
May 19, 2011 12:29
-
-
Save hinrik/980630 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings FATAL => 'all'; | |
use POE; | |
use Test::More tests => 2; | |
{ | |
package WheelWrapper; | |
use strict; | |
use warnings FATAL => 'all'; | |
use Carp 'croak'; | |
use POE; | |
use POE::Filter::Stream; | |
use POE::Wheel::Run; | |
require Exporter; | |
use base 'Exporter'; | |
our @EXPORT = qw(quickie_run quickie quickie_merged quickie_tee quickie_tee_merged); | |
our @EXPORT_OK = @EXPORT; | |
our %EXPORT_TAGS = (ALL => [@EXPORT]); | |
our %OBJECTS; | |
sub new { | |
my ($package, %args) = @_; | |
my $parent_id = $poe_kernel->get_active_session->ID; | |
if (my $self = $OBJECTS{$parent_id}) { | |
return $self; | |
} | |
my $self = bless \%args, $package; | |
$self->{parent_id} = $parent_id; | |
$OBJECTS{$parent_id} = $self; | |
return $self; | |
} | |
sub _create_session { | |
my ($self) = @_; | |
POE::Session->create( | |
object_states => [ | |
$self => [qw( | |
_start | |
_stop | |
_exception | |
_create_wheel | |
_child_signal | |
_child_timeout | |
_child_stdin | |
_child_stdout | |
_child_stderr | |
_killall | |
)], | |
], | |
options => { | |
($self->{debug} ? (debug => 1) : ()), | |
($self->{default} ? (default => 1) : ()), | |
($self->{trace} ? (trace => 1) : ()), | |
}, | |
); | |
return; | |
} | |
sub _start { | |
my ($kernel, $session, $self) = @_[KERNEL, SESSION, OBJECT]; | |
my $session_id = $session->ID; | |
$self->{session_id} = $session_id; | |
$kernel->sig(DIE => '_exception'); | |
return; | |
} | |
sub _stop { | |
my $self = $_[OBJECT]; | |
delete $self->{session_id}; | |
return; | |
} | |
sub run { | |
my ($self, %args) = @_; | |
croak 'Program parameter not supplied' if !defined $args{Program}; | |
if ($args{AltFork} && ref $args{Program}) { | |
croak 'Program must be a string when AltFork is enabled'; | |
} | |
if ($args{AltFork} && $^O eq 'Win32') { | |
croak 'AltFork does not currently work on Win32'; | |
} | |
$self->_create_session() if !defined $self->{session_id}; | |
my ($exception, $wheel) | |
= $poe_kernel->call($self->{session_id}, '_create_wheel', \%args); | |
# propagate possible exception from POE::Wheel::Run->new() | |
croak $exception if $exception; | |
return $wheel->PID; | |
} | |
sub _create_wheel { | |
my ($kernel, $self, $args) = @_[KERNEL, OBJECT, ARG0]; | |
my %data; | |
for my $arg (qw(AltFork Timeout Input Program Context ProgramArgs | |
StdoutEvent StderrEvent ExitEvent ResultEvent Tee Merged)) { | |
next if !exists $args->{$arg}; | |
$data{$arg} = delete $args->{$arg}; | |
} | |
if ($data{AltFork}) { | |
my @inc = map { +'-I' => $_ } @INC; | |
$data{Program} = [$^X, @inc, '-e', $data{Program}]; | |
} | |
my $wheel; | |
eval { | |
$wheel = POE::Wheel::Run->new( | |
StdinFilter => POE::Filter::Stream->new(), | |
StdinEvent => '_child_stdin', | |
StdoutEvent => '_child_stdout', | |
StderrEvent => '_child_stderr', | |
Program => $data{Program}, | |
(defined $data{ProgramArgs} | |
? (ProgramArgs => $data{ProgramArgs}) | |
: () | |
), | |
($^O ne 'Win32' | |
? (CloseOnCall => 1) | |
: () | |
), | |
%$args, | |
); | |
}; | |
if ($@) { | |
chomp $@; | |
return $@; | |
} | |
$data{obj} = $wheel; | |
$data{extra_args} = $args; | |
$self->{wheels}{$wheel->ID} = \%data; | |
if (defined $data{Input}) { | |
$wheel->put($data{Input}); | |
} | |
else { | |
$wheel->shutdown_stdin(); | |
} | |
if ($data{Timeout}) { | |
$data{alrm} = | |
$kernel->delay_set('_child_timeout', $data{Timeout}, $wheel->ID); | |
} | |
$kernel->sig_child($wheel->PID, '_child_signal'); | |
return (undef, $wheel); | |
} | |
sub _exception { | |
my ($kernel, $self, $ex) = @_[KERNEL, OBJECT, ARG1]; | |
chomp $ex->{error_str}; | |
warn __PACKAGE__.": Event $ex->{event} in session " | |
.$ex->{dest_session}->ID." raised exception:\n $ex->{error_str}\n"; | |
$kernel->sig_handled(); | |
return; | |
} | |
sub _child_signal { | |
my ($kernel, $self, $pid, $status) = @_[KERNEL, OBJECT, ARG1, ARG2]; | |
my $id = $self->_pid_to_id($pid); | |
my $data = $self->{wheels}{$id}; | |
my $s = $status >> 8; | |
if ($s != 0 && !exists $data->{ExitEvent} | |
&& !exists $data->{ResultEvent}) { | |
warn "Child $pid exited with nonzero status $s\n"; | |
} | |
$kernel->alarm_remove($data->{alrm}) if $data->{Timeout}; | |
if ($data->{lazy}) { | |
$self->{lazy}{$id} = { | |
merged => $data->{merged}, | |
stdout => $data->{stdout}, | |
stderr => $data->{stderr}, | |
status => $status, | |
} | |
} | |
delete $self->{wheels}{$id}; | |
if (defined $data->{ExitEvent}) { | |
$kernel->call( | |
$self->{parent_id}, | |
$data->{ExitEvent}, | |
$status, | |
$pid, | |
(defined $data->{Context} | |
? $data->{Context} | |
: ()), | |
); | |
} | |
if (defined $data->{ResultEvent}) { | |
$kernel->call( | |
$self->{parent_id}, | |
$data->{ResultEvent}, | |
$data->{stdout}, | |
$data->{stderr}, | |
$data->{merged}, | |
$status, | |
$pid, | |
(defined $data->{Context} | |
? $data->{Context} | |
: ()), | |
); | |
} | |
return; | |
} | |
sub _child_timeout { | |
my ($self, $id) = @_[OBJECT, ARG0]; | |
$self->{wheels}{$id}{obj}->kill(); | |
return; | |
} | |
sub _child_stdin { | |
my ($self, $id) = @_[OBJECT, ARG0]; | |
$self->{wheels}{$id}{obj}->shutdown_stdin(); | |
return; | |
} | |
sub _child_stdout { | |
my ($kernel, $self, $output, $id) = @_[KERNEL, OBJECT, ARG0, ARG1]; | |
my $data = $self->{wheels}{$id}; | |
if ($data->{lazy} || defined $data->{ResultEvent}) { | |
push @{ $data->{merged} }, $output; | |
push @{ $data->{stdout} }, $output; | |
if ($data->{lazy}{Tee}) { | |
print $output, "\n"; | |
} | |
} | |
elsif (!exists $data->{StdoutEvent}) { | |
print "$output\n"; | |
} | |
elsif (defined (my $event = $data->{StdoutEvent})) { | |
my $context = $data->{Context}; | |
$kernel->call( | |
$self->{parent_id}, | |
$event, | |
$output, | |
$data->{obj}->PID, | |
(defined $context ? $context : ()), | |
); | |
} | |
return; | |
} | |
sub _child_stderr { | |
my ($kernel, $self, $error, $id) = @_[KERNEL, OBJECT, ARG0, ARG1]; | |
my $data = $self->{wheels}{$id}; | |
if ($data->{lazy} || defined $data->{ResultEvent}) { | |
push @{ $data->{merged} }, $error; | |
push @{ $data->{stderr} }, $error; | |
if ($data->{lazy}{Tee}) { | |
$data->{lazy}{Merged} | |
? print $error, "\n" | |
: warn $error, "\n"; | |
} | |
} | |
elsif (!exists $data->{StderrEvent}) { | |
warn "$error\n"; | |
} | |
elsif (defined (my $event = $data->{StderrEvent})) { | |
my $context = $data->{Context}; | |
$kernel->call( | |
$self->{parent_id}, | |
$event, | |
$error, | |
$data->{obj}->PID, | |
(defined $context ? $context : ()), | |
); | |
} | |
return; | |
} | |
sub _pid_to_id { | |
my ($self, $pid) = @_; | |
for my $id (keys %{ $self->{wheels} }) { | |
return $id if $self->{wheels}{$id}{obj}->PID == $pid; | |
} | |
return; | |
} | |
sub killall { | |
my $self = shift; | |
$self = POE::Quickie->new() if ref $self ne 'POE::Quickie'; | |
$poe_kernel->call($self->{session_id}, '_killall', @_); | |
return; | |
} | |
sub _killall { | |
my ($kernel, $self, $signal) = @_[KERNEL, OBJECT, ARG0]; | |
$kernel->alarm_remove_all(); | |
for my $id (keys %{ $self->{wheels}}) { | |
$self->{wheels}{$id}{obj}->kill($signal); | |
} | |
return; | |
} | |
sub processes { | |
my ($self) = @_; | |
$self = POE::Quickie->new() if ref $self ne 'POE::Quickie'; | |
my %wheels; | |
for my $id (keys %{ $self->{wheels} }) { | |
my $pid = $self->{wheels}{$id}{obj}->PID; | |
$wheels{$pid} = $self->{wheels}{$id}{Context}; | |
} | |
return \%wheels; | |
} | |
sub _lazy_run { | |
my ($self, %args) = @_; | |
my %run_args; | |
if (@{ $args{RunArgs} } == 1 && | |
(!ref $args{RunArgs}[0] || ref ($args{RunArgs}[0]) =~ /^(?:ARRAY|CODE)$/)) { | |
$run_args{Program} = $args{RunArgs}[0]; | |
} | |
else { | |
%run_args = @{ $args{RunArgs} }; | |
} | |
my $pid = $self->run( | |
%run_args, | |
ExitEvent => undef, | |
($args{Tee} ? () : (StderrEvent => undef)), | |
($args{Tee} ? () : (StdoutEvent => undef)), | |
); | |
my $id = $self->_pid_to_id($pid); | |
$self->{wheels}{$id}{lazy} = { | |
Tee => $args{Tee}, | |
Merged => $args{Merged}, | |
}; | |
my $parent_id = $poe_kernel->get_active_session->ID; | |
$poe_kernel->refcount_increment($parent_id, __PACKAGE__); | |
$poe_kernel->run_one_timeslice() while $self->{wheels}{$id}; | |
$poe_kernel->refcount_decrement($parent_id, __PACKAGE__); | |
my $data = delete $self->{lazy}{$id}; | |
return $data->{merged}, $data->{status} if $args{Merged}; | |
return $data->{stdout}, $data->{stderr}, $data->{status}; | |
} | |
sub quickie_run { | |
my %args = @_; | |
my $self = POE::Quickie->new(); | |
return $self->run(%args); | |
} | |
sub quickie { | |
my @args = @_; | |
my $self = POE::Quickie->new(); | |
return $self->_lazy_run( | |
RunArgs => \@args | |
); | |
} | |
sub quickie_tee { | |
my @args = @_; | |
my $self = POE::Quickie->new(); | |
return $self->_lazy_run( | |
RunArgs => \@args, | |
Tee => 1, | |
); | |
} | |
sub quickie_merged { | |
my @args = @_; | |
my $self = POE::Quickie->new(); | |
return $self->_lazy_run( | |
RunArgs => \@args, | |
Merged => 1, | |
); | |
} | |
sub quickie_tee_merged { | |
my @args = @_; | |
my $self = POE::Quickie->new(); | |
return $self->_lazy_run( | |
RunArgs => \@args, | |
Tee => 1, | |
Merged => 1, | |
); | |
} | |
} | |
POE::Session->create( | |
package_states => [ | |
(__PACKAGE__) => [qw( | |
_start | |
stdout | |
stderr | |
)], | |
], | |
); | |
POE::Kernel->run; | |
sub _start { | |
my $heap = $_[HEAP]; | |
$heap->{wrapper} = WheelWrapper->new(trace => 1); | |
$heap->{wrapper}->run( | |
Program => sub { print "foo\n" }, | |
StdoutEvent => 'stdout', | |
); | |
} | |
sub stdout { | |
my ($heap, $output) = @_[HEAP, ARG0]; | |
is($output, 'foo', 'Got stdout'); | |
$heap->{wrapper}->run( | |
Program => sub { warn "bar\n" }, | |
StderrEvent => 'stderr', | |
); | |
} | |
sub stderr { | |
my ($heap, $error) = @_[HEAP, ARG0]; | |
is($error, 'bar', 'Got stderr'); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment