Advertisement
Guest User

Untitled

a guest
Jun 17th, 2019
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.10 KB | None | 0 0
  1. internal interface ISmartCacheReplicator
  2. {
  3. void PushMessage(CacheReplicationMessage cacheMessage, MessageIdentifier messageIdentifier);
  4. void IuEnd(List<int> datasets);
  5. void StaleData(int dataset);
  6. void RequestIu(int dataset, byte dataflow);
  7. }
  8.  
  9. internal class ReplicationProtocol : IDisposable
  10. {
  11. private const int PROTOCOL_VERSION = 2;
  12. private static readonly INonBlockingPool<BinaryMessage> messagePool = SingletonCommunicationObjects.MessagePool;
  13.  
  14. private readonly byte clientId;
  15. private readonly IReplicate replicator;
  16. private readonly ISmartCacheReplicator smartCacheReplicator;
  17.  
  18. public ReplicationProtocol(string cacheName, byte clientId, IReplicate replicator, ISmartCacheReplicator smartCacheReplicator)
  19. {
  20. Diagnostics.ThrowIfNull(replicator, nameof(replicator));
  21. Diagnostics.ThrowIfNull(smartCacheReplicator, nameof(smartCacheReplicator));
  22.  
  23. this.smartCacheReplicator = smartCacheReplicator;
  24. this.replicator = replicator;
  25. this.clientId = clientId;
  26.  
  27. replicator.Register(cacheName, clientId, null, ReceiveCallback, StateCallback, StaleCallback, IuCallback, GetDestinationInfoCallback);
  28. }
  29.  
  30. private byte[] GetDestinationInfoCallback(byte[] dataflows)
  31. {
  32. return Array.Empty<byte>();
  33. }
  34.  
  35. private void IuCallback(int dataset, byte dataflow, byte[] destinationInfo)
  36. {
  37. smartCacheReplicator.RequestIu(dataset, dataflow);
  38. }
  39.  
  40. private void StaleCallback(int dataset, byte dataflow)
  41. {
  42. smartCacheReplicator.StaleData(dataset);
  43. }
  44.  
  45. private void StateCallback(IUState iuState, List<int> datasets)
  46. {
  47. if (iuState != IUState.End)
  48. {
  49. return;
  50. }
  51.  
  52. smartCacheReplicator.IuEnd(datasets);
  53. }
  54.  
  55. private void ReceiveCallback(BinaryMessage message, int dataset, byte dataflow, MessageIdentifier messageIdentifier)
  56. {
  57. message.Decompress();
  58. var reader = message.StartReading();
  59. var source = (CacheReplicationSource)Deserializer.Deserialize(reader);
  60. var version = (byte)Deserializer.Deserialize(reader);
  61. if(version == 2)
  62. {
  63. var cacheMessage = (CacheReplicationMessage)Deserializer.Deserialize(reader);
  64. smartCacheReplicator.PushMessage(cacheMessage, messageIdentifier);
  65. }
  66. }
  67.  
  68. public void Dispose()
  69. {
  70. replicator.Unregister(clientId);
  71. }
  72.  
  73. public int SendMessage(CacheReplicationSource source, CacheReplicationMessage message, int dataset, byte dataflow)
  74. {
  75. Diagnostics.ThrowIfNull(message, nameof(message));
  76.  
  77. var serializedMessage = messagePool.Get();
  78. int size = 0;
  79. try
  80. {
  81. var writer = serializedMessage.StartWriting();
  82.  
  83. Serializer.Serialize(source, writer);
  84. Serializer.Serialize(PROTOCOL_VERSION, writer);
  85. Serializer.Serialize(message, writer);
  86. serializedMessage.StopWriting();
  87.  
  88. size = serializedMessage.ByteSize;
  89.  
  90. serializedMessage.Compress();
  91. replicator.SendReplica(clientId, serializedMessage, dataset, dataflow);
  92. }
  93. finally
  94. {
  95. messagePool.Put(serializedMessage);
  96. }
  97.  
  98. return size;
  99. }
  100.  
  101. public void Acknowledge(MessageIdentifier msgIdent)
  102. {
  103. replicator.AcknowledgeMessageStored(msgIdent);
  104. }
  105. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement