Advertisement
Guest User

Untitled

a guest
Jul 14th, 2015
180
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.03 KB | None | 0 0
  1. if(expectedVersion == ExpectedVersion.NoStream)
  2. {
  3. using(var tx = _connection.BeginTransaction(IsolationLevel.Serializable))
  4. {
  5. int streamIdInternal = -1;
  6. using(
  7. var command =
  8. new NpgsqlCommand(
  9. "INSERT INTO streams(id, id_original) VALUES (:stream_id, :stream_id_original) RETURNING id_internal;")
  10. )
  11. {
  12. command.Parameters.AddWithValue(":stream_id", streamIdInfo.StreamId);
  13. command.Parameters.AddWithValue("streamIdOriginal", streamIdInfo.StreamIdOriginal);
  14.  
  15. streamIdInternal = await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
  16. }
  17.  
  18. using (var writer = _connection.BeginBinaryImport("COPY events (stream_id_internal, stream_version, id, created, type, json_data, json_metadata) FROM STDIN BINARY"))
  19. {
  20. int version = 0;
  21.  
  22. foreach (var @event in events)
  23. {
  24. if(cancellationToken.IsCancellationRequested)
  25. {
  26. writer.Cancel();
  27. }
  28.  
  29. writer.StartRow();
  30. writer.Write(streamIdInternal, NpgsqlDbType.Integer);
  31. writer.Write(++version, NpgsqlDbType.Integer);
  32. writer.Write(@event.EventId, NpgsqlDbType.Uuid);
  33. writer.Write(SystemClock.GetUtcNow(), NpgsqlDbType.TimestampTZ);
  34. writer.Write(@event.Type);
  35. writer.Write(@event.JsonData, NpgsqlDbType.Json);
  36. writer.Write(@event.JsonMetadata, NpgsqlDbType.Json);
  37. }
  38. }
  39.  
  40. tx.Commit();
  41. }
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement