Last active
September 17, 2019 12:51
-
-
Save FGasper/1583e59368c5d1a98e8c858fbf9c4cf6 to your computer and use it in GitHub Desktop.
Demonstration of promises-based wrapper around Net::Curl
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
# A demonstration. An epoll implementation of Net::Curl::Promiser is below. | |
package main; | |
use strict; | |
use warnings; | |
use Net::Curl::Easy qw(:constants); | |
use Linux::Perl::epoll (); | |
my @urls = ( | |
'http://perl.com', | |
'http://metacpan.org', | |
); | |
my $epoll = Linux::Perl::epoll->new(); | |
#---------------------------------------------------------------------- | |
# Add the handles to the Promiser object. | |
my $promiser = My::Curl::Epoll->new($epoll); | |
for my $url (@urls) { | |
my $handle = Net::Curl::Easy->new(); | |
$handle->setopt( CURLOPT_URL() => $url ); | |
$handle->setopt( CURLOPT_FOLLOWLOCATION() => 1 ); | |
$promiser->add_handle($handle)->then( | |
sub { print "$url completed.$/" }, | |
sub { warn "$url: " . shift }, | |
); | |
} | |
#---------------------------------------------------------------------- | |
while ($promiser->handles()) { | |
my $timeout = $promiser->get_timeout(); | |
my @events = !$timeout ? () : $epoll->wait( | |
maxevents => 10, | |
timeout => $timeout / 1000, | |
); | |
if (@events) { | |
my (@rcv, @snd); | |
while ( my ($fd, $evts_num) = splice @events, 0, 2 ) { | |
if ($evts_num & $epoll->EVENT_NUMBER()->{'IN'}) { | |
push @rcv, $fd; | |
} | |
if ($evts_num & $epoll->EVENT_NUMBER()->{'OUT'}) { | |
push @snd, $fd; | |
} | |
} | |
$promiser->process( \@snd, \@rcv ); | |
} | |
else { | |
$promiser->time_out(); | |
} | |
} | |
#---------------------------------------------------------------------- | |
package My::Curl::Epoll; | |
use parent 'Net::Curl::Promiser'; | |
sub new { | |
my ($class, $epoll) = @_; | |
my $self = $class->SUPER::new(); | |
$self->{'_epoll'} = $epoll; | |
return $self; | |
} | |
sub _set_epoll { | |
my ($self, $fd, @events) = @_; | |
if ( exists $self->{'_fds'}{$fd} ) { | |
$self->{'_epoll'}->modify( $fd, events => \@events ); | |
} | |
else { | |
$self->{'_epoll'}->add( $fd, events => \@events ); | |
$self->{'_fds'}{$fd} = undef; | |
} | |
return; | |
} | |
sub _SET_POLL_IN { | |
my ($self, $fd) = @_; | |
return $self->_set_epoll( $fd, 'IN' ); | |
} | |
sub _SET_POLL_OUT { | |
my ($self, $fd) = @_; | |
return $self->_set_epoll( $fd, 'OUT' ); | |
} | |
sub _SET_POLL_INOUT { | |
my ($self, $fd) = @_; | |
return $self->_set_epoll( $fd, 'IN', 'OUT' ); | |
} | |
sub _STOP_POLL { | |
my ($self, $fd) = @_; | |
if ( delete $self->{'_fds'}{$fd} ) { | |
$self->{'_epoll'}->delete( $fd ); | |
} | |
return; | |
} | |
1; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The base class; an implementation will extend this to support whatever event mechanism, e.g., epoll. | |
package Net::Curl::Promiser; | |
use strict; | |
use warnings; | |
use Promise::ES6 (); | |
use Net::Curl::Multi (); | |
use constant _DEFAULT_TIMEOUT => 1000; | |
sub new { | |
my ($class) = @_; | |
my %props = ( | |
callbacks => {}, | |
to_fail => {}, | |
); | |
my $self = bless \%props, $class; | |
my $curl = Net::Curl::Multi->new(); | |
$self->{'curl'} = $curl; | |
$curl->setopt( | |
Net::Curl::Multi::CURLMOPT_SOCKETDATA, | |
$self, | |
); | |
$curl->setopt( | |
Net::Curl::Multi::CURLMOPT_SOCKETFUNCTION, | |
\&_socket_fn, | |
); | |
if (my $timer_fn = $class->can('_ON_TIMEOUT_CHANGE')) { | |
$curl->setopt( | |
Net::Curl::Multi::CURLMOPT_TIMERDATA, | |
$self, | |
); | |
$curl->setopt( | |
Net::Curl::Multi::CURLMOPT_TIMERFUNCTION, | |
$timer_fn, | |
); | |
} | |
return $self; | |
} | |
sub time_out { | |
my ($self) = @_; | |
my $is_active = $self->{'curl'}->socket_action( Net::Curl::Multi::CURL_SOCKET_TIMEOUT() ); | |
$self->_process_pending(); | |
return $is_active; | |
} | |
sub process { | |
my ($self, $send_fds_ar, $receive_fds_ar) = @_; | |
for my $fd (@$send_fds_ar) { | |
$self->{'curl'}->socket_action( $fd, Net::Curl::Multi::CURL_CSELECT_OUT() ); | |
} | |
for my $fd (@$receive_fds_ar) { | |
$self->{'curl'}->socket_action( $fd, Net::Curl::Multi::CURL_CSELECT_IN() ); | |
} | |
$self->_process_pending(); | |
return $self; | |
} | |
sub handles { | |
return shift()->{'curl'}->handles(); | |
} | |
sub get_timeout { | |
my ($self) = @_; | |
my $timeout = $self->{'curl'}->timeout(); | |
return( $timeout < 0 ? _DEFAULT_TIMEOUT() : $timeout ); | |
} | |
sub add_handle { | |
my ($self, $easy) = @_; | |
$self->{'curl'}->add_handle($easy); | |
my $promise = Promise::ES6->new( sub { | |
$self->{'callbacks'}{$easy} = \@_; | |
} ); | |
return $promise; | |
} | |
sub fail_handle { | |
my ($self, $easy, $reason) = @_; | |
$self->{'to_fail'}{$easy} = [ $easy, $reason ]; | |
return; | |
} | |
#---------------------------------------------------------------------- | |
sub _socket_fn { | |
my ( $multi, $easy, $fd, $action, $assign, $self ) = @_; | |
if ($action == Net::Curl::Multi::CURL_POLL_IN) { | |
$self->_SET_POLL_IN($fd); | |
} | |
elsif ($action == Net::Curl::Multi::CURL_POLL_OUT) { | |
$self->_SET_POLL_OUT($fd); | |
} | |
elsif ($action == Net::Curl::Multi::CURL_POLL_INOUT) { | |
$self->_SET_POLL_INOUT($fd); | |
} | |
elsif ($action == Net::Curl::Multi::CURL_POLL_REMOVE) { | |
$self->_STOP_POLL($fd); | |
} | |
return 0; | |
} | |
sub _socket_action { | |
my ($self, $fd, $direction) = @_; | |
my $is_active = $self->{'curl'}->socket_action( $fd, $direction ); | |
$self->_process_pending(); | |
return $is_active; | |
} | |
sub _finish_handle { | |
my ($self, $easy, $cb_idx, $payload) = @_; | |
delete $self->{'to_fail'}{$easy}; | |
$self->{'curl'}->remove_handle( $easy ); | |
if ( my $cb_ar = delete $self->{'callbacks'}{$easy} ) { | |
$cb_ar->[$cb_idx]->($payload); | |
} | |
return; | |
} | |
sub _clear_failed { | |
my ($self) = @_; | |
for my $easy_str ( keys %{ $self->{'to_fail'} } ) { | |
my $val_ar = delete $self->{'to_fail'}{$easy_str}; | |
my ($easy, $reason) = @$val_ar; | |
$self->_finish_handle( $easy, 1, $reason ); | |
} | |
return; | |
} | |
sub _process_pending { | |
my ($self) = @_; | |
$self->_clear_failed(); | |
while ( my ( $msg, $easy, $result ) = $self->{'curl'}->info_read() ) { | |
if ($msg != Net::Curl::Multi::CURLMSG_DONE()) { | |
die "Unrecognized info_read() message: [$msg]"; | |
} | |
if ( my $val_ar = delete $self->{'to_fail'}{$easy} ) { | |
my ($easy, $reason) = @$val_ar; | |
$self->_finish_handle( $easy, 1, $reason ); | |
} | |
else { | |
$self->_finish_handle( | |
$easy, | |
($result == 0) ? ( 0 => $easy ) : ( 1 => $result ), | |
); | |
} | |
} | |
return; | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment