Skip to content

Instantly share code, notes, and snippets.

@preaction
Created October 6, 2016 19:55
Show Gist options
  • Save preaction/0993e4454f996070fc4537ff99c0dc91 to your computer and use it in GitHub Desktop.
Save preaction/0993e4454f996070fc4537ff99c0dc91 to your computer and use it in GitHub Desktop.
package Rewriter;
# ABSTRACT: Track and retry batches of writes in database transactions
=head1 SYNOPSIS
my $rewriter = Rewriter->new(
dsn => 'dbi:SQLite:test.db',
user => 'user',
pass => 'password',
query => {
insert => 'INSERT INTO foo ( bar ) VALUES ( ? )',
update => 'UPDATE foo SET bar = ? WHERE id = ?',
},
);
$rewriter->write( insert => 1 );
$rewriter->write( update => 2, 1 );
$rewriter->commit;
=head1 DESCRIPTION
This module keeps track of data so that it can be rewritten if the
database connection fails. When a connection failure is detected, the
connection is restarted, the statements re-initialized, the current
transaction rewritten.
This module will automatically write when the transaction reaches the
given size, allowing for periodic checkpoints.
=head1 SEE ALSO
=over
=item L<DBI>
=back
=cut
use strict;
use warnings;
use Moo;
use DBI;
our $DEBUG = 0;
sub debug($) {
warn shift if $DEBUG;
}
=attr dsn
The DBI Data Source Name to use to connect to the database. This is necessary
to reconnect to the database as needed.
=cut
has dsn => (
is => 'ro',
required => 1,
);
=attr user
The user to use to connect to the database, if necessary.
=cut
has user => (
is => 'ro',
);
=attr pass
The password to use to connect to the database, if necessary.
=cut
has pass => (
is => 'ro',
);
=attr dbi_attrs
A hash reference of DBI attributes to pass in to C<< DBI->connect >>. This
should contain C<< AutoCommit => 0 >> if needed to disable automatic committing.
Defaults to: C<< RaiseError => 0 >>, C<< PrintError => 1 >>, C<< AutoCommit => 0 >>.
=cut
has dbi_attrs => (
is => 'ro',
default => sub {
{
RaiseError => 0,
PrintError => 1,
AutoCommit => 0,
},
},
);
=attr on_connect
A subref to call whenever the database is reconnected. This allows you
to run any connection setup commands as needed.
The subroutine will get a single argument, the Rewriter object. From
this object, you can get the database handle from L<the dbh attribute|/dbh>.
=cut
has on_connect => (
is => 'ro',
lazy => 1,
default => sub { sub { } },
);
=attr dbh
A connected database handle. You can specify this if you already have one, or
else it will created as needed.
=cut
has dbh => (
is => 'rw',
lazy => 1,
clearer => '_clear_dbh',
builder => '_build_dbh',
);
=attr query
A hash of SQL queries. Statement handlers will be created automatically
as needed.
=cut
has query => (
is => 'rw',
);
#=attr _sth
# The statement handlers for the prepared queries, as needed
has _sth => (
is => 'ro',
lazy => 1,
clearer => '_clear_sth',
default => sub { {} },
);
#=attr _txn
# An array of arrayrefs of [ $query_name, @query_params ] for the
# current pending transation. To be used to restart the transaction if
# needed
has _txn => (
is => 'ro',
lazy => 1,
clearer => '_clear_txn',
default => sub { [] },
);
=attr size
The number of items to hold on to between commits. Defaults to C<100>.
=cut
has size => (
is => 'ro',
default => sub { 100 },
);
=method write
$writer->write( query_name => @placeholders );
Write a row using the given query. Writes are kept until a commit so
they can be retried. If the connection fails during the write, we will
wait for an increasing delay, and restart the current transaction. If
the write fails for any other reason, an exception will be thrown.
If the number of writes in this transaction reaches L<the size attribute|/size>,
a L<commit|/commit> will be performed automatically.
=cut
sub write {
my ( $self, $query, @params ) = @_;
debug "Writing to $query";
$self->_execute( $query, @params );
push @{ $self->_txn }, [ $query, @params ];
if ( @{ $self->_txn } >= $self->size ) {
$self->commit;
}
}
=method commit
$writer->commit;
Commit the current transaction. If the commit fails once, the connection
will be restarted, and the transaction tried again. If the commit fails
twice, an exception will be thrown.
=cut
sub commit {
my ( $self ) = @_;
eval {
$self->dbh->commit;
};
if ( _is_connection_error( $@ || $DBI::errstr ) ) {
$self->restart;
$self->dbh->commit;
}
elsif ( $@ || $DBI::errstr ) {
die $@ || $DBI::errstr;
}
$self->_clear_txn;
}
=method DESTROY
Flush out any pending commits before destroying the statements and database
handle.
=cut
sub DESTROY {
my ( $self ) = @_;
if ( @{ $self->_txn } ) {
$self->commit;
}
}
=method restart
Restart the current transaction after reconnecting to the database. This
creates a new L<database handle|/dbh>, all new statement handles, and replays
the current transaction.
The reconnect will be attempted until it succeeds, backing off on retrying
as needed.
=cut
sub restart {
my ( $self ) = @_;
$self->_clear_dbh;
$self->_clear_sth;
debug "Restarting";
my $dbh;
# Back off in fibonacci style
my ( $delay, $last_delay ) = ( 1, 1 );
eval {
$dbh = $self->dbh;
};
while ( _is_connection_error( $@ || $DBI::errstr ) ) {
sleep $delay;
( $delay, $last_delay ) = ( $delay + $last_delay, $delay );
eval {
$dbh = $self->dbh;
};
}
if ( ( $@ || $DBI::errstr ) && !_is_connection_error( $@ || $DBI::errstr ) ) {
die $@ || $DBI::errstr;
}
for my $write ( @{ $self->_txn } ) {
$self->_execute( @$write );
}
}
# Connect to the database. If failed, throw an exception
sub _build_dbh {
my ( $self ) = @_;
debug "Connecting to database";
my $dbh = DBI->connect( $self->dsn, $self->user, $self->pass, $self->dbi_attrs )
|| die $DBI::errstr;
$self->on_connect->( $dbh );
return $dbh;
}
# Get the statement handle with the given name, creating it if
# necessary.
sub _get_sth {
my ( $self, $name ) = @_;
if ( my $sth = $self->_sth->{ $name } ) {
debug "Getting cached sth $name";
return $sth;
}
debug "Creating new sth $name for query " . $self->query->{ $name };
my $sth = $self->dbh->prepare( $self->query->{ $name } )
|| die $DBI::errstr;
$self->_sth->{ $name } = $sth;
return $sth;
}
# Execute the given named query with the given query parameters,
# retrying if there is a connection problem.
sub _execute {
my ( $self, $name, @params ) = @_;
debug "Executing $name with " . join ", ", @params;
eval {
$self->_get_sth( $name )->execute( @params );
};
if ( _is_connection_error( $@ || $DBI::errstr ) ) {
$self->restart;
$self->_get_sth( $name )->execute( @params );
}
elsif ( $@ || $DBI::errstr ) {
die $@ || $DBI::errstr;
}
}
# Returns true if the passed-in string indicates a connection problem.
sub _is_connection_error {
my ( $err ) = @_;
return unless $err;
debug "Got error: $err";
return $err =~ /attempt to execute on inactive database handle|vous n'est pas connecte|votre session a ete fermee/;
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment