Advertisement
Guest User

DetoX

a guest
Aug 16th, 2009
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 4.60 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Messaging;
  4. using System.Diagnostics;
  5. using System.Threading;
  6.  
  7. namespace HAKGERSoft {
  8.  
  9.     /// <summary>
  10.     /// Sends and receives .NET objects between processes by MSMQ
  11.     /// </summary>
  12.     public class MSMQDispatcher {
  13.         readonly string QueuePath;
  14.         MessageQueue Queue;
  15.         Cursor Location;
  16.         List<Type> RegisteredTypes;
  17.         SynchronizationContext Context;
  18.  
  19.         /// <summary>
  20.         /// Occurs when new message arrive - use RegisterType() before
  21.         /// </summary>
  22.         public event EventHandler<DispatcherEventArgs> OnReceived;
  23.  
  24.         /// <summary>
  25.         /// Creates new MSMQDispatcher instance
  26.         /// </summary>
  27.         /// <param name="queueName">Common name of the MSMQ queue that is used between processes</param>
  28.         public MSMQDispatcher(string queueName) {
  29.             QueuePath=GetQueuePath(queueName);
  30.             if(!MessageQueue.Exists(QueuePath)) {
  31.                 Debug.WriteLine("Provided queue does not exist, creating new one");
  32.                 MessageQueue.Create(QueuePath);
  33.             }
  34.             Queue=new MessageQueue(QueuePath);
  35.             RegisteredTypes=new List<Type>();
  36.         }
  37.  
  38.         /// <summary>
  39.         /// Sends an object
  40.         /// </summary>
  41.         /// <typeparam name="T">Type of object to be sent</typeparam>
  42.         /// <param name="label">Additional label of the message</param>
  43.         /// <param name="obj">Object to sent</param>
  44.         public void Send<T>(string label,T obj) {
  45.             Queue.Send(obj,label);
  46.         }
  47.  
  48.         /// <summary>
  49.         /// Clears all previous messages from the queue
  50.         /// </summary>
  51.         public void Purge() {
  52.             Queue.Purge();
  53.         }
  54.  
  55.         /// <summary>
  56.         /// Start listen MSMQ for new messages - use this function if you want to receive data
  57.         /// </summary>
  58.         public void StartListener() {
  59.             if(Location!=null)
  60.                 throw new InvalidOperationException("Listener already started");
  61.             Context=SynchronizationContext.Current;
  62.             if(Context==null)
  63.                 Context=new SynchronizationContext();
  64.             Location=GetCurrentLocarion();
  65.             BeginPeek(PeekAction.Current);
  66.         }
  67.  
  68.         /// <summary>
  69.         /// Register type that should be received
  70.         /// </summary>
  71.         /// <typeparam name="T"></typeparam>
  72.         public void RegisterType<T>() {
  73.             RegisteredTypes.Add(typeof(T));
  74.         }
  75.  
  76.         string GetQueuePath(string queueName) {
  77.             return string.Format(@".\private$\{0}",queueName);
  78.         }
  79.  
  80.         void BeginPeek(PeekAction peekAction) {
  81.             Queue.BeginPeek(MessageQueue.InfiniteTimeout,Location,peekAction,null,QueueAsyncCallBack);
  82.         }
  83.  
  84.         void QueueAsyncCallBack(IAsyncResult asyncResult) {
  85.             Message message= Queue.EndPeek(asyncResult);
  86.             message.Formatter=new XmlMessageFormatter(RegisteredTypes.ToArray());
  87.             object body=null;
  88.             try {
  89.                 body=message.Body;
  90.             } catch(InvalidOperationException ex) {
  91.                 Debug.WriteLine(ex.ToString());
  92.             }
  93.             if(body!=null)
  94.                 RaiseReceived(message.Label,body);
  95.             BeginPeek(PeekAction.Next);
  96.         }
  97.  
  98.         void RaiseReceived(string label,object message) {
  99.             PostCallback<DispatcherEventArgs>(OnReceived,new DispatcherEventArgs(label,message));
  100.         }
  101.  
  102.         void PostCallback<T>(EventHandler<T> handler,T args) where T:EventArgs {
  103.             if(handler == null)
  104.                 return;
  105.             Context.Post(new SendOrPostCallback((state) => {
  106.                 handler(this,args);
  107.             }),null);
  108.         }
  109.  
  110.         Cursor GetCurrentLocarion() {
  111.             Cursor cursor=Queue.CreateCursor();
  112.             try {
  113.                 Queue.Peek(TimeSpan.Zero,cursor,PeekAction.Current);
  114.                 while(true) {
  115.                     Queue.Peek(TimeSpan.Zero,cursor,PeekAction.Next);
  116.                 }
  117.             } catch(MessageQueueException ex) {
  118.                 if(ex.MessageQueueErrorCode!=MessageQueueErrorCode.IOTimeout) {
  119.                     throw;
  120.                 }
  121.             }
  122.             return cursor;
  123.         }
  124.     }
  125.  
  126.     public class DispatcherEventArgs:EventArgs {
  127.         public readonly string MessageLabel;
  128.         public readonly object MessageObject;
  129.  
  130.         public DispatcherEventArgs(string label,object message) {
  131.             MessageLabel=label;
  132.             MessageObject=message;
  133.         }
  134.     }
  135. }
  136.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement