Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- if(expectedVersion == ExpectedVersion.NoStream)
- {
- using(var tx = _connection.BeginTransaction(IsolationLevel.Serializable))
- {
- int streamIdInternal = -1;
- using(
- var command =
- new NpgsqlCommand(
- "INSERT INTO streams(id, id_original) VALUES (:stream_id, :stream_id_original) RETURNING id_internal;")
- )
- {
- command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId);
- command.Parameters.AddWithValue("streamIdOriginal", streamIdInfo.StreamIdOriginal);
- streamIdInternal = await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
- }
- using (var writer = _connection.BeginBinaryImport("COPY events (stream_id_internal, stream_version, id, created, type, json_data, json_metadata) FROM STDIN BINARY"))
- {
- int version = 0;
- foreach (var @event in events)
- {
- if(cancellationToken.IsCancellationRequested)
- {
- writer.Cancel();
- }
- writer.StartRow();
- writer.Write(streamIdInternal, NpgsqlDbType.Integer);
- writer.Write(++version, NpgsqlDbType.Integer);
- writer.Write(@event.EventId, NpgsqlDbType.Uuid);
- writer.Write(SystemClock.GetUtcNow(), NpgsqlDbType.TimestampTZ);
- writer.Write(@event.Type);
- writer.Write(@event.JsonData, NpgsqlDbType.Json);
- writer.Write(@event.JsonMetadata, NpgsqlDbType.Json);
- }
- }
- tx.Commit();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement