Created
October 6, 2016 19:55
-
-
Save preaction/0993e4454f996070fc4537ff99c0dc91 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Rewriter; | |
# ABSTRACT: Track and retry batches of writes in database transactions | |
=head1 SYNOPSIS | |
my $rewriter = Rewriter->new( | |
dsn => 'dbi:SQLite:test.db', | |
user => 'user', | |
pass => 'password', | |
query => { | |
insert => 'INSERT INTO foo ( bar ) VALUES ( ? )', | |
update => 'UPDATE foo SET bar = ? WHERE id = ?', | |
}, | |
); | |
$rewriter->write( insert => 1 ); | |
$rewriter->write( update => 2, 1 ); | |
$rewriter->commit; | |
=head1 DESCRIPTION | |
This module keeps track of data so that it can be rewritten if the | |
database connection fails. When a connection failure is detected, the | |
connection is restarted, the statements re-initialized, the current | |
transaction rewritten. | |
This module will automatically write when the transaction reaches the | |
given size, allowing for periodic checkpoints. | |
=head1 SEE ALSO | |
=over | |
=item L<DBI> | |
=back | |
=cut | |
use strict; | |
use warnings; | |
use Moo; | |
use DBI; | |
our $DEBUG = 0; | |
sub debug($) { | |
warn shift if $DEBUG; | |
} | |
=attr dsn | |
The DBI Data Source Name to use to connect to the database. This is necessary | |
to reconnect to the database as needed. | |
=cut | |
has dsn => ( | |
is => 'ro', | |
required => 1, | |
); | |
=attr user | |
The user to use to connect to the database, if necessary. | |
=cut | |
has user => ( | |
is => 'ro', | |
); | |
=attr pass | |
The password to use to connect to the database, if necessary. | |
=cut | |
has pass => ( | |
is => 'ro', | |
); | |
=attr dbi_attrs | |
A hash reference of DBI attributes to pass in to C<< DBI->connect >>. This | |
should contain C<< AutoCommit => 0 >> if needed to disable automatic committing. | |
Defaults to: C<< RaiseError => 0 >>, C<< PrintError => 1 >>, C<< AutoCommit => 0 >>. | |
=cut | |
has dbi_attrs => ( | |
is => 'ro', | |
default => sub { | |
{ | |
RaiseError => 0, | |
PrintError => 1, | |
AutoCommit => 0, | |
}, | |
}, | |
); | |
=attr on_connect | |
A subref to call whenever the database is reconnected. This allows you | |
to run any connection setup commands as needed. | |
The subroutine will get a single argument, the Rewriter object. From | |
this object, you can get the database handle from L<the dbh attribute|/dbh>. | |
=cut | |
has on_connect => ( | |
is => 'ro', | |
lazy => 1, | |
default => sub { sub { } }, | |
); | |
=attr dbh | |
A connected database handle. You can specify this if you already have one, or | |
else it will created as needed. | |
=cut | |
has dbh => ( | |
is => 'rw', | |
lazy => 1, | |
clearer => '_clear_dbh', | |
builder => '_build_dbh', | |
); | |
=attr query | |
A hash of SQL queries. Statement handlers will be created automatically | |
as needed. | |
=cut | |
has query => ( | |
is => 'rw', | |
); | |
#=attr _sth | |
# The statement handlers for the prepared queries, as needed | |
has _sth => ( | |
is => 'ro', | |
lazy => 1, | |
clearer => '_clear_sth', | |
default => sub { {} }, | |
); | |
#=attr _txn | |
# An array of arrayrefs of [ $query_name, @query_params ] for the | |
# current pending transation. To be used to restart the transaction if | |
# needed | |
has _txn => ( | |
is => 'ro', | |
lazy => 1, | |
clearer => '_clear_txn', | |
default => sub { [] }, | |
); | |
=attr size | |
The number of items to hold on to between commits. Defaults to C<100>. | |
=cut | |
has size => ( | |
is => 'ro', | |
default => sub { 100 }, | |
); | |
=method write | |
$writer->write( query_name => @placeholders ); | |
Write a row using the given query. Writes are kept until a commit so | |
they can be retried. If the connection fails during the write, we will | |
wait for an increasing delay, and restart the current transaction. If | |
the write fails for any other reason, an exception will be thrown. | |
If the number of writes in this transaction reaches L<the size attribute|/size>, | |
a L<commit|/commit> will be performed automatically. | |
=cut | |
sub write { | |
my ( $self, $query, @params ) = @_; | |
debug "Writing to $query"; | |
$self->_execute( $query, @params ); | |
push @{ $self->_txn }, [ $query, @params ]; | |
if ( @{ $self->_txn } >= $self->size ) { | |
$self->commit; | |
} | |
} | |
=method commit | |
$writer->commit; | |
Commit the current transaction. If the commit fails once, the connection | |
will be restarted, and the transaction tried again. If the commit fails | |
twice, an exception will be thrown. | |
=cut | |
sub commit { | |
my ( $self ) = @_; | |
eval { | |
$self->dbh->commit; | |
}; | |
if ( _is_connection_error( $@ || $DBI::errstr ) ) { | |
$self->restart; | |
$self->dbh->commit; | |
} | |
elsif ( $@ || $DBI::errstr ) { | |
die $@ || $DBI::errstr; | |
} | |
$self->_clear_txn; | |
} | |
=method DESTROY | |
Flush out any pending commits before destroying the statements and database | |
handle. | |
=cut | |
sub DESTROY { | |
my ( $self ) = @_; | |
if ( @{ $self->_txn } ) { | |
$self->commit; | |
} | |
} | |
=method restart | |
Restart the current transaction after reconnecting to the database. This | |
creates a new L<database handle|/dbh>, all new statement handles, and replays | |
the current transaction. | |
The reconnect will be attempted until it succeeds, backing off on retrying | |
as needed. | |
=cut | |
sub restart { | |
my ( $self ) = @_; | |
$self->_clear_dbh; | |
$self->_clear_sth; | |
debug "Restarting"; | |
my $dbh; | |
# Back off in fibonacci style | |
my ( $delay, $last_delay ) = ( 1, 1 ); | |
eval { | |
$dbh = $self->dbh; | |
}; | |
while ( _is_connection_error( $@ || $DBI::errstr ) ) { | |
sleep $delay; | |
( $delay, $last_delay ) = ( $delay + $last_delay, $delay ); | |
eval { | |
$dbh = $self->dbh; | |
}; | |
} | |
if ( ( $@ || $DBI::errstr ) && !_is_connection_error( $@ || $DBI::errstr ) ) { | |
die $@ || $DBI::errstr; | |
} | |
for my $write ( @{ $self->_txn } ) { | |
$self->_execute( @$write ); | |
} | |
} | |
# Connect to the database. If failed, throw an exception | |
sub _build_dbh { | |
my ( $self ) = @_; | |
debug "Connecting to database"; | |
my $dbh = DBI->connect( $self->dsn, $self->user, $self->pass, $self->dbi_attrs ) | |
|| die $DBI::errstr; | |
$self->on_connect->( $dbh ); | |
return $dbh; | |
} | |
# Get the statement handle with the given name, creating it if | |
# necessary. | |
sub _get_sth { | |
my ( $self, $name ) = @_; | |
if ( my $sth = $self->_sth->{ $name } ) { | |
debug "Getting cached sth $name"; | |
return $sth; | |
} | |
debug "Creating new sth $name for query " . $self->query->{ $name }; | |
my $sth = $self->dbh->prepare( $self->query->{ $name } ) | |
|| die $DBI::errstr; | |
$self->_sth->{ $name } = $sth; | |
return $sth; | |
} | |
# Execute the given named query with the given query parameters, | |
# retrying if there is a connection problem. | |
sub _execute { | |
my ( $self, $name, @params ) = @_; | |
debug "Executing $name with " . join ", ", @params; | |
eval { | |
$self->_get_sth( $name )->execute( @params ); | |
}; | |
if ( _is_connection_error( $@ || $DBI::errstr ) ) { | |
$self->restart; | |
$self->_get_sth( $name )->execute( @params ); | |
} | |
elsif ( $@ || $DBI::errstr ) { | |
die $@ || $DBI::errstr; | |
} | |
} | |
# Returns true if the passed-in string indicates a connection problem. | |
sub _is_connection_error { | |
my ( $err ) = @_; | |
return unless $err; | |
debug "Got error: $err"; | |
return $err =~ /attempt to execute on inactive database handle|vous n'est pas connecte|votre session a ete fermee/; | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment