bra_fsn_hu

hornetq stomp durable subscriber

Sep 11th, 2013
179
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.02 KB | None | 0 0
  1. import logging
  2. logging.basicConfig()
  3. #logging.getLogger().setLevel(logging.DEBUG)
  4. import stomp
  5.  
  6. cnt=0
  7. class RadiusListener(object):
  8.     def on_error(self, headers, message):
  9.         print 'received an error %s' % message
  10.  
  11.     def on_message(self, headers, message):
  12.         global cnt
  13.         # return these attributes from the headers in ACK frame
  14.         ack_keys = ['message-id', 'subscription']
  15.        
  16.         if int(headers['content-length'])!=len(message):
  17.             print "content error",headers['content-length'],len(message)
  18.             import sys
  19.             sys.exit(1)
  20.         cnt+=1
  21.         print cnt
  22.         conn.ack(dict([(i, headers[i]) for i in ack_keys if i in headers]))
  23.  
  24.  
  25. if __name__ == '__main__':
  26.     conn = stomp.Connection(host_and_ports=[('mq.private',61612)],
  27.                             version='1.1',
  28.                             heartbeats=(1000, 3000),
  29.                             user='radius_consumer',
  30.                             passcode='pass',
  31.                             use_ssl=True,
  32.                             ssl_key_file = '/etc/stunnel/radius-consumer.key',
  33.                             ssl_cert_file = '/etc/stunnel/radius-consumer.pem',
  34.                             )
  35.     conn.set_listener('', RadiusListener())
  36.     conn.start()
  37.     conn.connect(headers={'client-id':'radiusmq'})
  38.     conn.subscribe(headers={
  39.                             'destination':'jms.topic.radius',
  40.                             #'destination':'/topic/radius',
  41.                             'ack':'client',
  42.                             'id':1,
  43.                             'durable-subscriber-name':'radiusmq',
  44.                             'activemq.subscriptionName':'radiusmq',
  45.                             'selector':"Xsystem = 'wired' AND ("
  46.                             "Xstatustype = 'STOP' OR "
  47.                             "Xstatustype = 'INTERIM_UPDATE')",
  48.                             }
  49.                    )
  50.    
  51.     while True:
  52.         import time
  53.         print time.time()
  54.         time.sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment