Advertisement
Guest User

Untitled

a guest
Mar 30th, 2017
53
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.41 KB | None | 0 0
  1. public class Message<T>
  2. {
  3. // Message data
  4. T Payload { get; set; }
  5.  
  6. // Identifiying "Network Name" of the target/recipient
  7. // This might identify an application, a physical host address, etc.
  8. // In RabbitMQ terms, it's a Routing Key.
  9. // If null, assume broadcast/fanout.
  10. string TargetName { get; set; }
  11.  
  12. // Unique identifier for RPC (Request-Reply) pattern
  13. // - Perhaps a GUID would be appropriate for this
  14. string CorrelationTerm { get; set; }
  15.  
  16. // Should the message layer include a provision to guarantee delivery?
  17. bool IsGuaranteedDelivery { get; set; }
  18.  
  19. // TODO - anything else? stuff like durability and persistence adds complexity
  20. // but it can be abstracted out here.
  21. }
  22.  
  23. public interface IMessageLayer : IDisposable
  24. {
  25. // Fire-n-forget publishing
  26. void Publish<T>(Message<T> message);
  27.  
  28. // General-purpose method for consumption of received messages
  29. ISubscriber<T> Subscribe<T>(SubscriptionParameters<T> parameters);
  30.  
  31. // Request-Reply pattern for RPC
  32. Task<Message<TReply>> Request<TReq, TReply>(Message<T> request);
  33. }
  34.  
  35. public interface ISubscriber<T> : IDisposable
  36. {
  37. event EventArgs<MessageReceivedEventArgs<T>> MessageReceived;
  38. void StartConsuming();
  39. void StopConsuming();
  40. }
  41.  
  42. var connectionParams = new MessageLayerConfig { /* Broker details */ }
  43. using (var myMessageEndpoint = MyMessageLayerFactory.Connect(connectionParams))
  44. {
  45. // TODO - Use messaging patterns here
  46. }
  47.  
  48. var mySubscriptionParams = new SubscriptionParams<T> { /* Consumer args */ }
  49. using (var mySubscription = myMessageEndpoint.CreateSubscriber(mySubscriptionParams))
  50. {
  51. var tcs = new TaskCompletionSource<FooMessage>();
  52. mySubscription.MessageReceived += (s, e) =>
  53. {
  54. // TODO: Handle message
  55. tcs.TrySetResult( /* receivedFooMessage */ );
  56. }
  57. mySubscription.StartConsuming();
  58. await tcs.Task;
  59. }
  60.  
  61. var message = new Message<FooMessage>
  62. {
  63. Payload = myLittleFooMessage,
  64. TargetName = "SomeHostName.SomeAppName",
  65. IsGuaranteedDelivery = true,
  66. };
  67. myMessageEndpoint.Publish(message);
  68.  
  69. var message = new Message<MyRequest>
  70. {
  71. Payload = myRequestData,
  72. TargetName = "SomeRpcServer.SomeRpcAppName",
  73. IsGuaranteedDelivery = false,
  74. };
  75. var task = myMessageEndpoint.Request(message);
  76.  
  77. if (await Task.WhenAny(task, Task.Delay(5000)) != task)
  78. {
  79. throw new TimeoutException("Request wait time exceeded 5sec");
  80. }
  81. var response = await task;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement