Created
June 18, 2015 16:10
-
-
Save danbarua/710e77de8dd5117d843c to your computer and use it in GitHub Desktop.
spike inserting events to PGSQL using the new copy api in v3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if(expectedVersion == ExpectedVersion.NoStream) | |
{ | |
using(var tx = _connection.BeginTransaction(IsolationLevel.Serializable)) | |
{ | |
int streamIdInternal = -1; | |
using( | |
var command = | |
new NpgsqlCommand( | |
"INSERT INTO streams(id, id_original) VALUES (:stream_id, :stream_id_original) RETURNING id_internal;") | |
) | |
{ | |
command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId); | |
command.Parameters.AddWithValue("streamIdOriginal", streamIdInfo.StreamIdOriginal); | |
streamIdInternal = await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext(); | |
} | |
using (var writer = _connection.BeginBinaryImport("COPY events (stream_id_internal, stream_version, id, created, type, json_data, json_metadata) FROM STDIN BINARY")) | |
{ | |
int version = 0; | |
foreach (var @event in events) | |
{ | |
if(cancellationToken.IsCancellationRequested) | |
{ | |
writer.Cancel(); | |
} | |
writer.StartRow(); | |
writer.Write(streamIdInternal, NpgsqlDbType.Integer); | |
writer.Write(++version, NpgsqlDbType.Integer); | |
writer.Write(@event.EventId, NpgsqlDbType.Uuid); | |
writer.Write(SystemClock.GetUtcNow(), NpgsqlDbType.TimestampTZ); | |
writer.Write(@event.Type); | |
writer.Write(@event.JsonData, NpgsqlDbType.Json); | |
writer.Write(@event.JsonMetadata, NpgsqlDbType.Json); | |
} | |
} | |
tx.Commit(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment