Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- logging.basicConfig()
- #logging.getLogger().setLevel(logging.DEBUG)
- import stomp
- cnt=0
- class RadiusListener(object):
- def on_error(self, headers, message):
- print 'received an error %s' % message
- def on_message(self, headers, message):
- global cnt
- # return these attributes from the headers in ACK frame
- ack_keys = ['message-id', 'subscription']
- if int(headers['content-length'])!=len(message):
- print "content error",headers['content-length'],len(message)
- import sys
- sys.exit(1)
- cnt+=1
- print cnt
- conn.ack(dict([(i, headers[i]) for i in ack_keys if i in headers]))
- if __name__ == '__main__':
- conn = stomp.Connection(host_and_ports=[('mq.private',61612)],
- version='1.1',
- heartbeats=(1000, 3000),
- user='radius_consumer',
- passcode='pass',
- use_ssl=True,
- ssl_key_file = '/etc/stunnel/radius-consumer.key',
- ssl_cert_file = '/etc/stunnel/radius-consumer.pem',
- )
- conn.set_listener('', RadiusListener())
- conn.start()
- conn.connect(headers={'client-id':'radiusmq'})
- conn.subscribe(headers={
- 'destination':'jms.topic.radius',
- #'destination':'/topic/radius',
- 'ack':'client',
- 'id':1,
- 'durable-subscriber-name':'radiusmq',
- 'activemq.subscriptionName':'radiusmq',
- 'selector':"Xsystem = 'wired' AND ("
- "Xstatustype = 'STOP' OR "
- "Xstatustype = 'INTERIM_UPDATE')",
- }
- )
- while True:
- import time
- print time.time()
- time.sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment