Created
January 28, 2017 01:51
-
-
Save Logioniz/444a9f84a577c8a45f7314beaf5f261d to your computer and use it in GitHub Desktop.
Example of ioloop (based on Mojo::IOLoop)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/perl | |
package Reactor::Select; | |
use Mojo::Base -base; | |
use IO::Select; | |
sub new { | |
my $self = shift->SUPER::new; | |
$self->{select}{read} = IO::Select->new; | |
$self->{select}{write} = IO::Select->new; | |
return $self; | |
} | |
sub add { | |
my ($self, $fd, $read, $write, $cb) = @_; | |
return unless $read || $write; | |
my $params = {cb => $cb}; | |
if ($read) { | |
$self->{select}{read}->add($fd); | |
$params->{read} = 1; | |
} | |
if ($write) { | |
$self->{select}{write}->add($fd); | |
$params->{write} = 1; | |
} | |
$self->{select}{fd}{$fd} = $params; | |
} | |
sub remove { | |
my ($self, $fd) = @_; | |
my $params = $self->{select}{fd}{$fd}; | |
$self->{select}{read}->remove($fd) if $params->{read}; | |
$self->{select}{write}->remove($fd) if $params->{write}; | |
delete $self->{select}{fd}{$fd}; | |
} | |
sub watch { | |
my ($self, $fd, $read, $write) = @_; | |
my $params = $self->{select}{fd}{$fd}; | |
if ($read && !$params->{read}) { | |
$self->{select}{read}->add($fd); | |
$params->{read} = 1; | |
} | |
if (!$read && $params->{read}) { | |
$self->{select}{read}->remove($fd); | |
delete $params->{read}; | |
} | |
if ($write && !$params->{write}) { | |
$self->{select}{write}->add($fd); | |
$params->{write} = 1; | |
} | |
if (!$write && $params->{write}) { | |
$self->{select}{write}->remove($fd); | |
delete $params->{write}; | |
} | |
return $self; | |
} | |
sub wait { | |
my ($self, $timeout) = @_; | |
my ($r, $w, $e) = IO::Select::select($self->{select}{read}, $self->{select}{write}, undef, $timeout); | |
for my $fh (@$r) { | |
my $cb = $self->{select}{fd}{$fh}{cb}; | |
$self->$cb($fh, 'read'); | |
} | |
for my $fh (@$w) { | |
my $cb = $self->{select}{fd}{$fh}{cb}; | |
$self->$cb($fh, 'write'); | |
} | |
} | |
package IOLoop; | |
use Mojo::Base -base; | |
use Time::HiRes 'time'; | |
use List::Util 'min'; | |
has 'reactor'; | |
has 'timers'; | |
sub new { | |
my $self = shift->SUPER::new; | |
$self->reactor(Reactor::Select->new); | |
$self->timers({}); | |
$self->{delay} = Delay->new(steps => [])->ioloop($self); | |
return $self; | |
} | |
sub add { | |
my $cb = pop; | |
my $self = shift; | |
$self->reactor->add(@_, sub { | |
my $reactor = shift; | |
$self->$cb(@_); | |
}); | |
return $self; | |
} | |
sub remove { | |
my $self = shift; | |
$self->reactor->remove(@_); | |
return $self; | |
} | |
sub watch { | |
my $self = shift; | |
$self->reactor->watch(@_); | |
return $self; | |
} | |
sub wait { | |
my $self = shift; | |
++$self->{running}; | |
$self->one_tick while $self->{running}; | |
} | |
sub one_tick { | |
my $self = shift; | |
my $min_time = min map { $_->{time} } values %{$self->timers}; | |
my $timeout = defined $min_time ? $min_time - time : 0.5; | |
$timeout = $timeout <= 0 ? 0 : $timeout; | |
$self->reactor->wait($timeout); | |
my $now = time; | |
for my $timer_id (keys %{$self->timers}) { | |
my ($time, $cb) = @{$self->timers->{$timer_id}}{qw/time cb/}; | |
next if $time >= $now; | |
delete $self->timers->{$timer_id}; | |
$self->$cb(); | |
} | |
} | |
sub start { | |
my $self = shift; | |
$self->wait unless $self->{running}; | |
} | |
sub stop { | |
delete shift->{running}; | |
} | |
sub timer { | |
my ($self, $seconds, $cb) = @_; | |
my $now = time; | |
my $timer_id = $$ . rand . $now; | |
$self->timers->{$timer_id} = { | |
time => $seconds + $now, | |
cb => $cb | |
}; | |
} | |
sub next_tick { | |
shift->timer(0, @_); | |
} | |
sub is_running { | |
shift->{running}; | |
} | |
sub singleton { | |
state $ioloop = shift->new(@_); | |
} | |
package EventEmitter; | |
use Mojo::Base -base; | |
sub on { | |
shift->subscribe(@_); | |
} | |
sub subscribe { | |
my ($self, $event, $cb) = @_; | |
push @{$self->{event}{$event}}, $cb; | |
return $self; | |
} | |
sub unsubscribe { | |
my ($self, $event, $cb) = @_; | |
return delete $self->{event}{$event} unless $cb; | |
my @subscribers = grep { $_ ne $cb } @{$self->{event}{$event}}; | |
$self->{event}{$event} = \@subscribers; | |
return $self; | |
} | |
sub emit { | |
my ($self, $event) = (shift, shift); | |
for my $cb (@{$self->{event}{$event} // []}) { | |
$self->$cb(@_); | |
} | |
return $self; | |
} | |
package Delay; | |
use Mojo::Base 'EventEmitter'; | |
has 'ioloop'; | |
sub new { | |
my $self = shift->SUPER::new(@_); | |
$self->{steps_state} = [map { {count => 0, action => 'all', args => []} } @{$self->{steps} // []}]; | |
@$self{qw/current_step data/} = (0, {}); | |
return $self; | |
} | |
sub begin { | |
my $self = shift; | |
my $step_id = $self->{current_step}; | |
my $operation_id = $self->{steps_state}[$step_id]{count}++; | |
return sub { | |
$self->_next($step_id, $operation_id, \@_); | |
} | |
} | |
sub wait { | |
my $self = shift; | |
return $self if $self->ioloop->is_running; | |
$self->on(finish => sub { $self->ioloop->stop }); | |
$self->ioloop->start; | |
return $self; | |
} | |
sub perform { | |
my $self = shift; | |
$self->ioloop->next_tick(sub { $self->_run_first }); | |
return $self; | |
} | |
sub run { | |
my $self = shift->new(steps => \@_); | |
$self->ioloop(IOLoop->singleton); | |
$self->perform->wait; | |
} | |
sub add_step { | |
my ($self, $cb) = @_; | |
push @{$self->{steps}}, $cb; | |
push @{$self->{steps_state}}, {count => 0, action => 'all', args => []}; | |
return $self; | |
} | |
sub data { | |
my $self = shift; | |
return $self->{data}{shift()} unless @_ > 1; | |
$self->{data}{$_[0]} = $_[1]; | |
return $self; | |
} | |
sub race { | |
my $self = shift; | |
@{$self->{steps_state}[$self->{current_step}]}{qw/action timeout/} = ('race', $_[1]); | |
return $self; | |
} | |
sub _run_first { | |
my $self = shift; | |
my $cb = shift @{$self->{steps}}; | |
$self->$cb(); | |
} | |
sub _next { | |
my ($self, $step_id, $operation_id, $params) = @_; | |
return unless $step_id == $self->{current_step}; | |
my $state = $self->{steps_state}[$step_id]; | |
--$state->{count}; | |
if ($state->{action} eq 'all') { | |
$state->{args}[$operation_id] = $params; | |
return if $state->{count} > 0; | |
} else { | |
$state->{args}[0] = [$operation_id, @$params]; | |
} | |
$self->{current_step}++; | |
my $cb = shift @{$self->{steps}}; | |
#my $cb = $self->{steps}->[$self->{current_step}++]; | |
$self->$cb(map { @$_ } @{delete $state->{args}}); | |
$self->emit('finish') unless @{$self->{steps}}; | |
} | |
package Stream; | |
use Mojo::Base 'EventEmitter'; | |
use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK); | |
has 'ioloop'; | |
has 'handle'; | |
sub new { | |
my $self = shift->SUPER::new(@_); | |
$self->{ioloop}->add($self->{handle}, 1, 1, sub { pop() eq 'write' ? $self->_write : $self->_read }); | |
$self->{buffer} = ''; | |
return $self; | |
} | |
sub close { | |
my $self = shift; | |
my $handle = delete $self->{handle}; | |
$handle->close; | |
$self->ioloop->remove($handle); | |
$self->emit('close'); | |
} | |
sub write { | |
my ($self, $chunk, $cb) = @_; | |
$self->{buffer} .= $chunk; | |
if ($cb) { | |
$self->on(drain => $cb); | |
} elsif (!length $self->{buffer}) { | |
return $self; | |
} | |
$self->ioloop->watch($self->{handle}, 1, 1) if $self->{handle}; | |
return $self; | |
} | |
sub _write { | |
my $self = shift; | |
my $handle = $self->handle; | |
if (length $self->{buffer}) { | |
return unless defined(my $written = $handle->syswrite($self->{buffer})); | |
$self->emit(write => substr($self->{buffer}, 0, $written, '')); | |
} | |
$self->emit('drain')->unsubscribe('drain') unless length $self->{buffer}; | |
return if length $self->{buffer}; | |
$self->ioloop->watch($handle, 1, 0) if $self->{handle}; | |
} | |
sub _read { | |
my $self = shift; | |
my $handle = $self->handle; | |
my $read = $self->{handle}->sysread(my $buffer, 100_000, 0); | |
return $read == 0 ? $self->close : $self->emit(read => $buffer) if defined $read; | |
# Retry | |
return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK; | |
# Closed (maybe real error) | |
$! == ECONNRESET ? $self->close : $self->emit(error => $!)->close; | |
} | |
package Server; | |
use Mojo::Base 'EventEmitter'; | |
use IO::Socket::IP; | |
has 'ioloop'; | |
has 'handle'; | |
sub listen { | |
my ($self, %options) = @_; | |
$self->handle(my $handle = IO::Socket::IP->new(%options)); | |
$handle->blocking(0); | |
return $self; | |
} | |
sub start { | |
my $self = shift; | |
$self->ioloop->add($self->handle, 1, 0, sub { $self->_accept(@_) }); | |
return $self; | |
} | |
sub stop { | |
my $self = shift; | |
$self->ioloop->remove($self->handle); | |
return $self; | |
} | |
sub _accept { | |
my ($self, $ioloop, $server_fd, $method) = @_; | |
return unless my $client_fd = $server_fd->accept; | |
$client_fd->blocking(0); | |
$self->emit(accept => $client_fd); | |
} | |
package Client; | |
use Mojo::Base 'EventEmitter'; | |
use Errno 'EINPROGRESS'; | |
use IO::Socket::IP; | |
has 'ioloop'; | |
has 'handle'; | |
sub connect { | |
my ($self, %options) = @_; | |
$self->handle(my $handle = IO::Socket::IP->new(%options, Blocking => 0)); | |
$handle->blocking(0); | |
$self->ioloop->add($handle, 0, 1, sub { $self->_connect(@_) }); | |
return $self; | |
} | |
sub _connect { | |
my ($self, $ioloop, $server_fd, $method) = @_; | |
# need to re-add $server_handle | |
if ($server_fd->isa('IO::Socket::IP') && !$server_fd->connect) { | |
return $self->emit(error => $!) unless $! == EINPROGRESS; | |
$self->ioloop->remove($server_fd); | |
$self->ioloop->add($server_fd, 1, 0, sub { $self->_connect(@_) }); | |
return; | |
} | |
return $self->emit(error => $! || 'Not connected') unless $server_fd->connected; | |
$self->ioloop->remove($server_fd); | |
$self->emit(connect => $server_fd); | |
} | |
package main; | |
use Mojo::Base -strict; | |
use IO::Socket::IP; | |
use DDP; | |
my $ioloop = IOLoop->singleton; | |
my $clients_data; | |
# my $sigint_count = 0; | |
# local $SIG{INT} = sub { warn "sigint $$"; die "sigint" if ++$sigint_count == 3; }; | |
sub server { | |
Server->new(ioloop => $ioloop) | |
->listen(Listen => 1, LocalPort => 8080, ReuseAddr => 1) | |
->on(accept => sub { | |
my ($server, $client_fd) = @_; | |
warn "ACCEPT\n\n"; | |
my $stream = Stream | |
->new(ioloop => $server->ioloop, handle => $client_fd) | |
->on(read => sub { | |
my ($stream, $data) = @_; | |
warn "Server READ: $client_fd\n\n"; | |
if ($data !~ m/\r?\n\r?\n/m) { | |
$clients_data->{$client_fd} = ($clients_data->{$client_fd} // '') . $data; | |
return; | |
} | |
my $client_data_all = ($clients_data->{$client_fd} // '') . $data; | |
delete $clients_data->{$client_fd}; | |
warn "$client_data_all\n\n"; | |
my $time; | |
Delay->run( | |
sub { | |
$time = time; | |
my $d = shift; | |
$ioloop->timer(1 => $d->begin); | |
$ioloop->timer(2 => $d->begin); | |
f_4_sec($d->begin); | |
}, sub { | |
warn 'Time in seconds from 1 -> 2: ', time - $time, "\n\n"; | |
p @_; | |
my $d = shift; | |
$stream->write("HTTP/1.1 200 OK\nContent-Length: 13\n\nHello, world\n" => sub {}); | |
$time = time; | |
$ioloop->timer(3 => $d->begin); | |
f_1_sec($d->begin); | |
$d->race; | |
}, sub { | |
warn 'Time in seconds from 2 -> 3: ', time - $time, "\n\n"; | |
} | |
) | |
->on(finish => sub { | |
warn 'Delay FINISH'; | |
}); | |
}) | |
->on(write => sub { | |
warn "Server WRITE\n\n"; | |
}) | |
->on(close => sub { | |
warn "Client close socket: $client_fd\n\n"; | |
}) | |
}) | |
->start | |
->ioloop->start; | |
# Server->new(ioloop => $ioloop) | |
# ->listen(Listen => 1, LocalPort => 8080, ReuseAddr => 1) | |
# ->on(accept => sub { | |
# my ($server, $client_fd) = @_; | |
# warn "ACCEPT\n\n"; | |
# $server->ioloop->add($client_fd, 1, 0, sub { | |
# my ($ioloop, $client_fd, $method) = @_; | |
# $client_fd->recv(my $data, 1000); | |
# if (length $data == 0) { | |
# warn "Client close socket: $client_fd\n\n"; | |
# $ioloop->remove($client_fd); | |
# delete $clients_data->{$client_fd}; | |
# return; | |
# } | |
# warn "Server READ: $client_fd\n\n"; | |
# if ($data !~ m/\r?\n\r?\n/m) { | |
# $clients_data->{$client_fd} = ($clients_data->{$client_fd} // '') . $data; | |
# return; | |
# } | |
# my $client_data_all = ($clients_data->{$client_fd} // '') . $data; | |
# delete $clients_data->{$client_fd}; | |
# warn "$client_data_all\n\n"; | |
# my $time; | |
# Delay->run( | |
# sub { | |
# $time = time; | |
# my $d = shift; | |
# $ioloop->timer(1 => $d->begin); | |
# $ioloop->timer(2 => $d->begin); | |
# f_4_sec($d->begin); | |
# }, sub { | |
# warn 'Time in seconds from 1 -> 2: ', time - $time, "\n\n"; | |
# p @_; | |
# my $d = shift; | |
# $client_fd->send("HTTP/1.1 200 OK\nContent-Length: 13\n\nHello, world\n"); | |
# $time = time; | |
# $ioloop->timer(3 => $d->begin); | |
# f_1_sec($d->begin); | |
# $d->race; | |
# }, sub { | |
# warn 'Time in seconds from 2 -> 3: ', time - $time, "\n\n"; | |
# } | |
# ); | |
# # $client_fd->close; | |
# # $ioloop->remove($client_fd); | |
# }); | |
# }) | |
# ->start | |
# ->ioloop->start; | |
} | |
sub client { | |
# Client->new(ioloop => $ioloop) | |
# ->on(connect => sub { | |
# my ($client, $server_fd) = @_; | |
# warn "CONNECT\n\n"; | |
# $server_fd->send("GET / HTTP/1.0\r\nHost: www.google.ru\r\n\r\n"); | |
# $client->ioloop->add($server_fd, 1, 0, sub { | |
# my ($ioloop, $server_fd, $method) = @_; | |
# $server_fd->recv(my $data, 1000); | |
# if (length $data == 0) { | |
# warn "Server close socket: $server_fd\n\n"; | |
# $ioloop->remove($server_fd); | |
# return; | |
# } | |
# warn "Client READ: $server_fd\n\n"; | |
# warn "$data\n\n"; | |
# }); | |
# }) | |
# ->on(error => sub { | |
# p @_; | |
# }) | |
# ->connect(PeerHost => 'www.google.ru', PeerPort => '80') | |
# ->ioloop->start; | |
Client->new(ioloop => $ioloop) | |
->on(connect => sub { | |
my ($client, $server_fd) = @_; | |
warn "CONNECT\n\n"; | |
my $stream = Stream | |
->new(ioloop => $client->ioloop, handle => $server_fd) | |
->on(read => sub { | |
warn "CLIENT READ\n\n"; | |
warn $_[1] . "\n\n"; | |
}) | |
->on(write => sub { | |
warn "CLIENT WRITE\n\n"; | |
}) | |
->write( | |
"POST / HTTP/1.0\r\nHost: www.google.ru\r\nContent-Length: 100000\r\n\r\n" . ('1' x 10_000) | |
=> sub { | |
warn "CLIENT DRAIN\n\n"; | |
}); | |
#$ioloop->{stream} = $stream; | |
}) | |
->on(error => sub { | |
p @_; | |
}) | |
->connect(PeerHost => '127.0.0.1', PeerPort => '12345') | |
#->connect(PeerHost => 'www.google.ru', PeerPort => '80') | |
->ioloop->start; | |
} | |
sub f_4_sec { | |
my $cb = shift; | |
Delay->run( | |
sub { | |
my $d = shift; | |
IOLoop->singleton->timer(3 => $d->begin); | |
$d->data(param => 'my params'); | |
}, sub { | |
my $d = shift; | |
IOLoop->singleton->timer(1 => $d->begin); | |
IOLoop->singleton->timer(3 => $d->begin); | |
$d->race; | |
}, sub { | |
my $d = shift; | |
$cb->($d->data('param'), '123'); | |
} | |
); | |
} | |
sub f_1_sec { | |
my $cb = shift; | |
Delay->run( | |
sub { | |
IOLoop->singleton->timer(1 => shift->begin); | |
}, sub { | |
$cb->('my params', 343); | |
} | |
); | |
} | |
# my $handle = IO::Socket::IP->new(PeerHost => 'www.google.ru', PeerPort => '80'); | |
# warn 111; | |
# $handle->connect(); | |
# warn 222; | |
# $handle->send("GET / HTTP/1.0\r\nHost: www.google.ru\r\n\r\n"); | |
# warn 333; | |
# $handle->recv(my $data, 1000); | |
# warn $data; | |
#client(); | |
server(); | |
__END__ | |
History | |
1) Create reactor (new, add, wait), Create ioloop (new, add, wait, start, stop) | |
2) Add reactor remove, Add ioloop remove | |
3) Add ioloop timer | |
4) Add ioloop next_tick, is_running, singleton, Add Delay | |
5) Add server | |
6) Add client | |
7) Add Delay | |
8) Add EventEmitter | |
9) Add stream | |
10) Add one_tick for IOLoop | |
TODO | |
1) Add for method race in delay timeout | |
2) SSL support | |
3) Check on memory leak (i have NO weaken in my code :) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment