Created
June 4, 2009 11:46
-
-
Save mala/123579 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Coro::Mysql::DBI; | |
use strict; | |
use warnings; | |
use DBI; | |
use Coro; | |
use Coro::Mysql; | |
use Hash::FieldHash qw(:all); | |
# our $ROOT_CLASS = 'DBIx::ContextualFetch'; | |
our $ROOT_CLASS = 'DBI'; | |
# MAX CONNECTION for same DB | |
our $MAX_CACHED_CONNECTIONS = 5; | |
# unblocking | |
our $unblock = 1; | |
# private | |
fieldhash my %coro_of; | |
my $conn_cache; | |
# connect & managed by Coro | |
sub connect { | |
my $class = shift; | |
my @info = @_; | |
my $current = $Coro::current; | |
my $dbh = $ROOT_CLASS->connect(@_); | |
Coro::Mysql::unblock $dbh if $unblock; | |
$coro_of{$dbh} = $current; | |
$current->on_destroy(sub { $coro_of{$dbh} = undef }); | |
return $dbh; | |
} | |
# connect_cached by coro, but other coro create new connection | |
sub connect_cached { | |
my $class = shift; | |
my @info = @_; | |
my $current = $Coro::current; | |
my $cache = ($conn_cache ||= Coro::Mysql::DBI::ConnCache->new($MAX_CACHED_CONNECTIONS)); | |
my $key = _cache_key(@info); | |
my $dbh = $cache->get($key); | |
unless ($dbh) { | |
# if connections of $key > $MAX_CACHED_CONNECTIONS, wait for other coro release $dbh | |
my $lock_success = $cache->get_lock($key); | |
if (!$lock_success) { | |
$cache->wait_for_release($key); | |
# retry find cached dbh | |
$dbh = $cache->get($key); | |
} | |
unless ($dbh) { | |
$dbh = $ROOT_CLASS->connect(@info); | |
Coro::Mysql::unblock $dbh if $unblock; | |
$cache->set($key, $dbh); | |
} | |
} | |
$coro_of{$dbh} = $current; | |
$current->on_destroy(sub { | |
$coro_of{$dbh} = undef; | |
$cache->release_lock($key); | |
}); | |
return $dbh; | |
} | |
sub coro_of { | |
my ($class, $dbh) = @_; | |
return unless $dbh; | |
$coro_of{$dbh}; | |
} | |
# copy from DBI | |
sub _cache_key { | |
my ($dsn, $user, $auth, $attr) = @_; | |
$attr ||= {}; | |
my $key = do { local $^W; | |
join "!\001", $dsn, $user, $auth, DBI::_concat_hash_sorted($attr, "=\001", ",\001", 0, 0) | |
}; | |
} | |
1; | |
package Coro::Mysql::DBI::ConnCache; | |
use strict; | |
use warnings; | |
use Coro; | |
use Coro::Semaphore; | |
our $DEBUG; | |
my %CACHE; | |
my %LOCK; | |
sub new { | |
my ($class, $max) = @_; | |
bless { | |
max => $max, | |
}, $class; | |
} | |
sub get { | |
my ($self, $key) = @_; | |
my ($dbh) = $self->_select_usable_dbh($key); | |
return $dbh; | |
} | |
sub wait_for_release { | |
my ($self, $key) = @_; | |
my $lock = ($LOCK{$key} ||= Coro::Semaphore->new($self->{max})); | |
# warn $lock->count; | |
$lock->down; | |
} | |
sub get_lock { | |
my ($self, $key) = @_; | |
my $lock = ($LOCK{$key} ||= Coro::Semaphore->new($self->{max})); | |
# warn $lock->count; | |
$lock->try; | |
} | |
sub release_lock { | |
my ($self, $key) = @_; | |
my $lock = ($LOCK{$key} ||= Coro::Semaphore->new($self->{max})); | |
# warn $lock->count; | |
$lock->up; | |
} | |
sub get_active_count { | |
my ($self, $key) = @_; | |
my $count = $self->select_all_dbh($key); | |
return $count; | |
} | |
sub set { | |
my ($self, $key, $dbh) = @_; | |
my $cache = ($CACHE{$key} ||= []); | |
push @{$cache}, $dbh; | |
return 1; | |
} | |
sub _select_usable_dbh { | |
my ($self, $key) = @_; | |
my $cache = ($CACHE{$key} ||= []); | |
my @usable; | |
my $current = $Coro::current; | |
# warn $current; | |
my @pairs = map { | |
[$_, Coro::Mysql::DBI->coro_of($_)] | |
} @{$cache}; | |
# cached by current coro | |
for my $pair (@pairs) { | |
my ($dbh, $coro) = @{$pair}; | |
if ($coro && $coro == $current) { | |
return $dbh; | |
} | |
} | |
for my $pair (@pairs) { | |
my ($dbh, $coro) = @{$pair}; | |
# used by other coro | |
next if $coro; | |
# reuse connection what is other coro used | |
# TODO: ping | |
# unless ($dbh && $dbh->FETCH('Active') && $dbh->ping) { | |
unless ($dbh && $dbh->FETCH('Active')) { | |
warn "dead $dbh" if $DEBUG; | |
$dbh = undef; | |
next; | |
} | |
push @usable, $dbh; | |
} | |
@{$cache} = grep { $_ } @{$cache}; | |
return @usable; | |
} | |
sub _select_all_dbh { | |
my ($self, $key) = @_; | |
my $cache = ($CACHE{$key} ||= []); | |
return grep {$_} @{$cache}; | |
} | |
1; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment