Advertisement
ihsanz2

tes2

Dec 5th, 2018
131
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.08 KB | None | 0 0
  1.  
  2. import cPickle as pickle
  3. import binascii
  4. from datetime import datetime
  5. from twisted.internet.defer import inlineCallbacks
  6. from twisted.internet import reactor
  7. from twisted.internet.protocol import ClientCreator
  8. from twisted.python import log
  9. from txamqp.protocol import AMQClient
  10. from txamqp.client import TwistedDelegate
  11. import txamqp.spec
  12.  
  13. from jasmin.vendor.smpp.pdu.pdu_types import DataCoding
  14.  
  15. import MySQLdb as mdb
  16.  
  17. q = {}
  18.  
  19. @inlineCallbacks
  20. def gotConnection(conn, username, password):
  21.     print "Connected to broker, authenticating: %s" % username
  22.     yield conn.start({"LOGIN": username, "PASSWORD": password})
  23.  
  24.     print "Authenticated. Ready to receive messages"
  25.     chan = yield conn.channel(1)
  26.     yield chan.channel_open()
  27.  
  28.     yield chan.queue_declare(queue="sms_logger_queue")
  29.  
  30.     # Bind to submit.sm.* and submit.sm.resp.* routes to track sent messages
  31.     yield chan.queue_bind(queue="sms_logger_queue", exchange="messaging", routing_key='submit.sm.*')
  32.     yield chan.queue_bind(queue="sms_logger_queue", exchange="messaging", routing_key='submit.sm.resp.*')
  33.     # Bind to dlr_thrower.* to track DLRs
  34.     yield chan.queue_bind(queue="sms_logger_queue", exchange="messaging", routing_key='dlr_thrower.*')
  35.  
  36.     yield chan.basic_consume(queue='sms_logger_queue', no_ack=False, consumer_tag="sms_logger")
  37.     queue = yield conn.queue("sms_logger")
  38.  
  39.     #Connection parameters - Fill this info with your MySQL server connection parameters
  40.     db = mdb.connect(
  41.         user='jasmin',
  42.         passwd='jadmin',
  43.         host='127.0.0.1',
  44.         db='jasmin')
  45.  
  46.     print "Connected to MySQL"
  47.     cursor = db.cursor()
  48.  
  49.     # Wait for messages
  50.     # This can be done through a callback ...
  51.     while True:
  52.         msg = yield queue.get()
  53.         props = msg.content.properties
  54.  
  55.         if msg.routing_key[:10] == 'submit.sm.' and msg.routing_key[:15] != 'submit.sm.resp.':
  56.             pdu = pickle.loads(msg.content.body)
  57.             pdu_count = 1
  58.             short_message = pdu.params['short_message']
  59.             billing = props['headers']
  60.             billing_pickle = billing.get('submit_sm_resp_bill')
  61.             if not billing_pickle:
  62.                 billing_pickle = billing.get('submit_sm_bill')
  63.             submit_sm_bill = pickle.loads(billing_pickle)
  64.             source_connector = props['headers']['source_connector']
  65.             routed_cid = msg.routing_key[10:]
  66.  
  67.             # Is it a multipart message ?
  68.             while hasattr(pdu, 'nextPdu'):
  69.                 # Remove UDH from first part
  70.                 if pdu_count == 1:
  71.                     short_message = short_message[6:]
  72.  
  73.                 pdu = pdu.nextPdu
  74.  
  75.                 # Update values:
  76.                 pdu_count += 1
  77.                 short_message += pdu.params['short_message'][6:]
  78.  
  79.             # Save short_message bytes
  80.             binary_message = binascii.hexlify(short_message)
  81.  
  82.             # If it's a binary message, assume it's utf_16_be encoded
  83.             if pdu.params['data_coding'] is not None:
  84.                 dc = pdu.params['data_coding']
  85.                 if (isinstance(dc, int) and dc == 8) or (isinstance(dc, DataCoding) and str(dc.schemeData) == 'UCS2'):
  86.                     short_message = short_message.decode('utf_16_be', 'ignore').encode('utf_8')
  87.  
  88.             q[props['message-id']] = {
  89.                 'source_connector': source_connector,
  90.                 'routed_cid': routed_cid,
  91.                 'rate': submit_sm_bill.getTotalAmounts() * pdu_count,
  92.                 'uid': submit_sm_bill.user.uid,
  93.                 'destination_addr': pdu.params['destination_addr'],
  94.                 'source_addr': pdu.params['source_addr'],
  95.                 'pdu_count': pdu_count,
  96.                 'short_message': short_message,
  97.                 'binary_message': binary_message,
  98.             }
  99.         elif msg.routing_key[:15] == 'submit.sm.resp.':
  100.             # It's a submit_sm_resp
  101.  
  102.             pdu = pickle.loads(msg.content.body)
  103.             if props['message-id'] not in q:
  104.                 print 'Got resp of an unknown submit_sm: %s' % props['message-id']
  105.                 chan.basic_ack(delivery_tag=msg.delivery_tag)
  106.                 continue
  107.  
  108.             qmsg = q[props['message-id']]
  109.  
  110.             if qmsg['source_addr'] is None:
  111.                 qmsg['source_addr'] = ''
  112.  
  113.             cursor.execute("""INSERT INTO submit_log (msgid, source_addr, rate, pdu_count,
  114.                                                      destination_addr, short_message,
  115.                                                      status, uid, created_at, binary_message,
  116.                                                      routed_cid, source_connector, status_at)
  117.                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  118.                    ON DUPLICATE KEY UPDATE trials = trials + 1;""", (
  119.                 props['message-id'],
  120.                 qmsg['source_addr'],
  121.                 qmsg['rate'],
  122.                 qmsg['pdu_count'],
  123.                 qmsg['destination_addr'],
  124.                 qmsg['short_message'],
  125.                 pdu.status,
  126.                 qmsg['uid'],
  127.                 props['headers']['created_at'],
  128.                 qmsg['binary_message'],
  129.                 qmsg['routed_cid'],
  130.                 qmsg['source_connector'],
  131.                 props['headers']['created_at'],))
  132.             db.commit()
  133.         elif msg.routing_key[:12] == 'dlr_thrower.':
  134.             if props['headers']['message_status'][:5] == 'ESME_':
  135.                 # Ignore dlr from submit_sm_resp
  136.                 chan.basic_ack(delivery_tag=msg.delivery_tag)
  137.                 continue
  138.  
  139.             # It's a dlr
  140.             if props['message-id'] not in q:
  141.                 print 'Got dlr of an unknown submit_sm: %s' % props['message-id']
  142.                 chan.basic_ack(delivery_tag=msg.delivery_tag)
  143.                 continue
  144.  
  145.             # Update message status
  146.             qmsg = q[props['message-id']]
  147.             cursor.execute("UPDATE submit_log SET status = %s, status_at = %s WHERE msgid = %s;", (
  148.                 props['headers']['message_status'],
  149.                 datetime.now(),
  150.                 props['message-id'],))
  151.             db.commit()
  152.         else:
  153.             print 'unknown route: %s' % msg.routing_key
  154.  
  155.         chan.basic_ack(delivery_tag=msg.delivery_tag)
  156.  
  157.     # A clean way to tear down and stop
  158.     yield chan.basic_cancel("sms_logger")
  159.     yield chan.channel_close()
  160.     chan0 = yield conn.channel(0)
  161.     yield chan0.connection_close()
  162.  
  163.     reactor.stop()
  164.  
  165.  
  166. if __name__ == "__main__":
  167.     host = '127.0.0.1'
  168.     port = 5672
  169.     vhost = '/'
  170.     username = 'guest'
  171.     password = 'guest'
  172.     spec_file = '/etc/jasmin/resource/amqp0-9-1.xml'
  173.  
  174.     spec = txamqp.spec.load(spec_file)
  175.  
  176.     # Connect and authenticate
  177.     d = ClientCreator(reactor,
  178.         AMQClient,
  179.         delegate=TwistedDelegate(),
  180.         vhost=vhost,
  181.         spec=spec).connectTCP(host, port)
  182.     d.addCallback(gotConnection, username, password)
  183.  
  184.     def whoops(err):
  185.         if reactor.running:
  186.             log.err(err)
  187.             reactor.stop()
  188.  
  189.     d.addErrback(whoops)
  190.  
  191.     reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement