bra_fsn_hu

activemq virtualtopic STOMP receive

Nov 5th, 2013
222
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.03 KB | None | 0 0
  1. import logging
  2. logging.basicConfig()
  3. #logging.getLogger().setLevel(logging.DEBUG)
  4. import stomp
  5. import time
  6. import threading
  7.  
  8. cnt=0
  9. prevcnt=0
  10. start=0
  11. class RadiusListener(object):
  12.     def on_error(self, headers, message):
  13.         print 'received an error %s' % message
  14.    
  15.     def on_connected(self, headers, message):
  16.         global start
  17.         start=time.time()
  18.  
  19.     def on_message(self, headers, message):
  20.         global cnt
  21.         # return these attributes from the headers in ACK frame
  22.         ack_keys = ['message-id', 'subscription']
  23.        
  24.         with lock:
  25.             cnt+=1
  26.         conn.ack(dict([(i, headers[i]) for i in ack_keys if i in headers]))
  27.    
  28.     def on_disconnected(self):
  29.         print "disconnected"
  30.  
  31. if __name__ == '__main__':
  32.     lock=threading.Lock()
  33.     conn = stomp.Connection(host_and_ports=[('localhost',5000)],
  34.                             use_ssl=True,
  35.                             ssl_key_file = '/etc/stunnel/radius-consumer.key',
  36.                             ssl_cert_file = '/etc/stunnel/radius-consumer.pem',
  37.                             )
  38.     conn.set_listener('', RadiusListener())
  39.     conn.start()
  40.     conn.connect(headers={
  41.                           'client-id':'radiusmq2tango'
  42.                           }
  43.                  )
  44.     conn.subscribe(headers={
  45.                             'destination':'/queue/Consumer.radiusmq2tango.VirtualTopic.radius',
  46.                             'ack':'client',
  47.                             'id':'radiusmq2tango',
  48.                             'activemq.prefetchSize':1000,
  49.                             'activemq.exclusive':'true',
  50.                             'persistent':'true',
  51.                             'selector':"Xsystem = 'wired' AND ("
  52.                             "Xstatustype = 'STOP' OR "
  53.                             "Xstatustype = 'INTERIM_UPDATE')",
  54.                             }
  55.                    )
  56.  
  57.     while True:
  58.         time.sleep(1)
  59.         with lock:
  60.             print time.time()-start,cnt-prevcnt,cnt
  61.             prevcnt=cnt
Advertisement
Add Comment
Please, Sign In to add comment