Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- internal interface ISmartCacheReplicator
- {
- void PushMessage(CacheReplicationMessage cacheMessage, MessageIdentifier messageIdentifier);
- void IuEnd(List<int> datasets);
- void StaleData(int dataset);
- void RequestIu(int dataset, byte dataflow);
- }
- internal class ReplicationProtocol : IDisposable
- {
- private const int PROTOCOL_VERSION = 2;
- private static readonly INonBlockingPool<BinaryMessage> messagePool = SingletonCommunicationObjects.MessagePool;
- private readonly byte clientId;
- private readonly IReplicate replicator;
- private readonly ISmartCacheReplicator smartCacheReplicator;
- public ReplicationProtocol(string cacheName, byte clientId, IReplicate replicator, ISmartCacheReplicator smartCacheReplicator)
- {
- Diagnostics.ThrowIfNull(replicator, nameof(replicator));
- Diagnostics.ThrowIfNull(smartCacheReplicator, nameof(smartCacheReplicator));
- this.smartCacheReplicator = smartCacheReplicator;
- this.replicator = replicator;
- this.clientId = clientId;
- replicator.Register(cacheName, clientId, null, ReceiveCallback, StateCallback, StaleCallback, IuCallback, GetDestinationInfoCallback);
- }
- private byte[] GetDestinationInfoCallback(byte[] dataflows)
- {
- return Array.Empty<byte>();
- }
- private void IuCallback(int dataset, byte dataflow, byte[] destinationInfo)
- {
- smartCacheReplicator.RequestIu(dataset, dataflow);
- }
- private void StaleCallback(int dataset, byte dataflow)
- {
- smartCacheReplicator.StaleData(dataset);
- }
- private void StateCallback(IUState iuState, List<int> datasets)
- {
- if (iuState != IUState.End)
- {
- return;
- }
- smartCacheReplicator.IuEnd(datasets);
- }
- private void ReceiveCallback(BinaryMessage message, int dataset, byte dataflow, MessageIdentifier messageIdentifier)
- {
- message.Decompress();
- var reader = message.StartReading();
- var source = (CacheReplicationSource)Deserializer.Deserialize(reader);
- var version = (byte)Deserializer.Deserialize(reader);
- if(version == 2)
- {
- var cacheMessage = (CacheReplicationMessage)Deserializer.Deserialize(reader);
- smartCacheReplicator.PushMessage(cacheMessage, messageIdentifier);
- }
- }
- public void Dispose()
- {
- replicator.Unregister(clientId);
- }
- public int SendMessage(CacheReplicationSource source, CacheReplicationMessage message, int dataset, byte dataflow)
- {
- Diagnostics.ThrowIfNull(message, nameof(message));
- var serializedMessage = messagePool.Get();
- int size = 0;
- try
- {
- var writer = serializedMessage.StartWriting();
- Serializer.Serialize(source, writer);
- Serializer.Serialize(PROTOCOL_VERSION, writer);
- Serializer.Serialize(message, writer);
- serializedMessage.StopWriting();
- size = serializedMessage.ByteSize;
- serializedMessage.Compress();
- replicator.SendReplica(clientId, serializedMessage, dataset, dataflow);
- }
- finally
- {
- messagePool.Put(serializedMessage);
- }
- return size;
- }
- public void Acknowledge(MessageIdentifier msgIdent)
- {
- replicator.AcknowledgeMessageStored(msgIdent);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement