Skip to content

Instantly share code, notes, and snippets.

@yreynhout
Created August 29, 2012 17:59
Show Gist options
  • Save yreynhout/3516291 to your computer and use it in GitHub Desktop.
Save yreynhout/3516291 to your computer and use it in GitHub Desktop.
Sample 2 - ThresholdedFlusher
public class ThresholdedSqlStatementFlusher : IObserver<ISqlStatement> {
readonly int _threshold;
readonly ISqlConnectionTransactionFactory _connectionTransactionFactory;
readonly ISqlBatchWriterFactory _batchWriterFactory;
readonly List<ISqlStatement> _statements;
public ThresholdedSqlStatementFlusher(
int threshold,
ISqlConnectionTransactionFactory _connectionTransactionFactory,
ISqlBatchWriterFactory batchWriterFactory) {
_threshold = threshold;
_connectionTransactionFactory = connectionTransactionFactory;
_batchWriterFactory = batchWriterFactory;
_statements = new List<ISqlStatement>();
}
public void OnNext(ISqlStatement value) {
_statements.Add(value);
FlushIfThresholdReached();
}
public void OnError(Exception error) { }
public void OnCompleted() {
FlushIfAny();
}
void FlushIfThresholdReached() {
if(_statements.Count > _threshold) {
Flush();
}
}
void FlushIfAny() {
if(_statements.Count != 0) {
Flush();
}
}
void Flush() {
using(var transaction = _connectionTransactionFactory.BeginTransaction()) {
using (var command = new SqlCommand()) {
command.Connection = transaction.Connection;
command.Transaction = transaction;
command.CommandTimeout = 1234;
using (var batchWriter = _batchWriterFactory.Create()) {
using (IEnumerator<ISqlStatement> enumerator = _statements.GetEnumerator()) {
var moved = enumerator.MoveNext();
while (moved) {
while (moved && enumerator.Current != null && enumerator.Current.TryWriteTo(batchWriter)) {
moved = enumerator.MoveNext();
}
if (batchWriter.TryWriteTo(command)) {
command.ExecuteNonQuery();
batchWriter.Reset();
}
}
}
}
}
transaction.Commit();
}
_statements.Clear();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment