Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- logging.basicConfig()
- #logging.getLogger().setLevel(logging.DEBUG)
- import stomp
- import time
- import threading
- cnt=0
- prevcnt=0
- start=0
- class RadiusListener(object):
- def on_error(self, headers, message):
- print 'received an error %s' % message
- def on_connected(self, headers, message):
- global start
- start=time.time()
- def on_message(self, headers, message):
- global cnt
- # return these attributes from the headers in ACK frame
- ack_keys = ['message-id', 'subscription']
- with lock:
- cnt+=1
- conn.ack(dict([(i, headers[i]) for i in ack_keys if i in headers]))
- def on_disconnected(self):
- print "disconnected"
- if __name__ == '__main__':
- lock=threading.Lock()
- conn = stomp.Connection(host_and_ports=[('localhost',5000)],
- use_ssl=True,
- ssl_key_file = '/etc/stunnel/radius-consumer.key',
- ssl_cert_file = '/etc/stunnel/radius-consumer.pem',
- )
- conn.set_listener('', RadiusListener())
- conn.start()
- conn.connect(headers={
- 'client-id':'radiusmq2tango'
- }
- )
- conn.subscribe(headers={
- 'destination':'/queue/Consumer.radiusmq2tango.VirtualTopic.radius',
- 'ack':'client',
- 'id':'radiusmq2tango',
- 'activemq.prefetchSize':1000,
- 'activemq.exclusive':'true',
- 'persistent':'true',
- 'selector':"Xsystem = 'wired' AND ("
- "Xstatustype = 'STOP' OR "
- "Xstatustype = 'INTERIM_UPDATE')",
- }
- )
- while True:
- time.sleep(1)
- with lock:
- print time.time()-start,cnt-prevcnt,cnt
- prevcnt=cnt
Advertisement
Add Comment
Please, Sign In to add comment