Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class Message<T>
- {
- // Message data
- T Payload { get; set; }
- // Identifiying "Network Name" of the target/recipient
- // This might identify an application, a physical host address, etc.
- // In RabbitMQ terms, it's a Routing Key.
- // If null, assume broadcast/fanout.
- string TargetName { get; set; }
- // Unique identifier for RPC (Request-Reply) pattern
- // - Perhaps a GUID would be appropriate for this
- string CorrelationTerm { get; set; }
- // Should the message layer include a provision to guarantee delivery?
- bool IsGuaranteedDelivery { get; set; }
- // TODO - anything else? stuff like durability and persistence adds complexity
- // but it can be abstracted out here.
- }
- public interface IMessageLayer : IDisposable
- {
- // Fire-n-forget publishing
- void Publish<T>(Message<T> message);
- // General-purpose method for consumption of received messages
- ISubscriber<T> Subscribe<T>(SubscriptionParameters<T> parameters);
- // Request-Reply pattern for RPC
- Task<Message<TReply>> Request<TReq, TReply>(Message<T> request);
- }
- public interface ISubscriber<T> : IDisposable
- {
- event EventArgs<MessageReceivedEventArgs<T>> MessageReceived;
- void StartConsuming();
- void StopConsuming();
- }
- var connectionParams = new MessageLayerConfig { /* Broker details */ }
- using (var myMessageEndpoint = MyMessageLayerFactory.Connect(connectionParams))
- {
- // TODO - Use messaging patterns here
- }
- var mySubscriptionParams = new SubscriptionParams<T> { /* Consumer args */ }
- using (var mySubscription = myMessageEndpoint.CreateSubscriber(mySubscriptionParams))
- {
- var tcs = new TaskCompletionSource<FooMessage>();
- mySubscription.MessageReceived += (s, e) =>
- {
- // TODO: Handle message
- tcs.TrySetResult( /* receivedFooMessage */ );
- }
- mySubscription.StartConsuming();
- await tcs.Task;
- }
- var message = new Message<FooMessage>
- {
- Payload = myLittleFooMessage,
- TargetName = "SomeHostName.SomeAppName",
- IsGuaranteedDelivery = true,
- };
- myMessageEndpoint.Publish(message);
- var message = new Message<MyRequest>
- {
- Payload = myRequestData,
- TargetName = "SomeRpcServer.SomeRpcAppName",
- IsGuaranteedDelivery = false,
- };
- var task = myMessageEndpoint.Request(message);
- if (await Task.WhenAny(task, Task.Delay(5000)) != task)
- {
- throw new TimeoutException("Request wait time exceeded 5sec");
- }
- var response = await task;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement