Advertisement
Guest User

Untitled

a guest
Jul 13th, 2016
257
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.87 KB | None | 0 0
  1. # script to dump all rabbitmq messages
  2. # python recover.py 192.168.1.94 %2f test test
  3. # !/usr/bin/python
  4. __author__ = 'gabriele'
  5.  
  6. import urllib2
  7. import base64
  8. import time
  9. import datetime
  10. import json
  11. import sys
  12. import pika
  13. import os
  14. import sqlite3
  15. import time
  16. import urllib
  17.  
  18.  
  19. def time_tostring():
  20.     ts = time.time();
  21.     return datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d--%H_%M_%S');
  22.  
  23.  
  24. def print_time(step):
  25.     print time_tostring() + " - " + step
  26.  
  27.  
  28. def write_message_to_file(conn, queue, method_frame,  header_frame, body):
  29.     conn.execute('insert into dump values (?,?,?,?,?,?,?,?)',
  30.         [method_frame.delivery_tag, str(header_frame.headers), header_frame.delivery_mode,body,
  31.         method_frame.routing_key, method_frame.exchange,queue['name'], queue['vhost']])
  32.     conn.commit()
  33.    
  34.  
  35. def drain_messages(consume_channel, q, conn):
  36.     method_frame, header_frame, body = consume_channel.basic_get(q['name'])
  37.     msg_count = 0
  38.     while method_frame:
  39.         write_message_to_file(conn, q, method_frame, header_frame, body)
  40.         method_frame, header_frame, body = consume_channel.basic_get(q['name'])
  41.     msg_count = msg_count + 1
  42.     if msg_count % 100 == 0:
  43.         print_time("Queue %s - Messages stored %d "% (q['name'], msg_count,))
  44.  
  45. def get_auth(user, password):
  46.     return base64.encodestring('%s:%s' % (user, password)).replace('\n', '')
  47.  
  48.  
  49. def call_api(rabbitmq_host, vhost, user, password, api):
  50.     print_time("Calling the API: " + api);
  51.     request = urllib2.Request("http://" + rabbitmq_host + ":15672/api/" + api);
  52.     request.add_header("Authorization", "Basic %s" % get_auth(user, password))
  53.     request.get_method = lambda: 'GET'
  54.     response = urllib2.urlopen(request)
  55.     items = json.load(response)
  56.     return items
  57.  
  58.  
  59. def create_sql_tables(conn):
  60.     print_time("Opened database successfully");
  61.     conn.execute('''CREATE TABLE dump
  62.       (DELIVERYTAG     INT PRIMARY KEY     NOT NULL,
  63.       HEADER           TEXT    NULL,
  64.       DELIVERY_MODE    INT NULL,
  65.       BODY             BLOB     NULL,
  66.       ROUTING_KEY      TEXT NULL,
  67.       EXCHANGE         TEXT NULL,
  68.       QUEUE            TEXT,
  69.       VHOST            TEXT NOT NULL);''')
  70.     print_time("Table created successfully");
  71.  
  72.  
  73. def dump_messages(host, vhost, user, password):
  74.     virtual_hosts = call_api(host, vhost, user, password, "vhosts")
  75.     for virtual_host in virtual_hosts:
  76.         print virtual_host['name']
  77.        
  78.     queues = call_api(host, vhost, user, password, "queues")
  79.     for queue in queues:
  80.         print_time("Queue %s Vhost %s " %  (queue['name'],queue['vhost'],))
  81.  
  82.     dump_dir = "dump_time_" #+ time_tostring()
  83.     if not os.path.exists(dump_dir):
  84.         os.makedirs(dump_dir)
  85.  
  86.     for queue in queues:
  87.         print_time(queue['name'] + " - " + queue['vhost'])
  88.         credentials = pika.PlainCredentials(user, password)
  89.         connection = pika.BlockingConnection(
  90.                 pika.ConnectionParameters(host, 5672, queue['vhost'], credentials))
  91.         channel = connection.channel()
  92.         print_time("Dumping queue:" + queue['vhost'] + " - " + queue['name'])
  93.         #file = open(dump_dir + "/" + queue['name'], 'w+')
  94.         conn = sqlite3.connect(dump_dir + '/dump_' + queue['name']+ '_vhost_' +urllib.quote(queue['vhost'],safe='') +'.db')
  95.         create_sql_tables(conn)
  96.         drain_messages(channel, queue, conn)
  97.         print_time("Done with queue:" + queue['name'])
  98.         conn.close()
  99.  
  100.     call_api(host, vhost, user, password, "connections")
  101.  
  102.     raw_input("key to stop")
  103.  
  104.     def kill():
  105.         channel.stop_consuming()
  106.  
  107.     connection.add_timeout(0, kill)
  108.     print "Goodbye!"
  109.  
  110.  
  111. def publish_row(row, channel):
  112.     delivery_mode = row[2]
  113.     body = row[3]
  114.     routing_key = row[4]
  115.     exchange =  row[5]
  116.     queue =  str(row[6])
  117.     vhost =  row[7]
  118.    
  119.     #credentials = pika.PlainCredentials(user, password)
  120.     #connection = pika.BlockingConnection(
  121.                #pika.ConnectionParameters(host, 5672, vhost, credentials))
  122.     #channel = connection.channel()
  123.     channel.basic_publish(exchange="",
  124.                       routing_key=queue,
  125.                       body=body,
  126.                       properties=pika.BasicProperties(
  127.                          delivery_mode = delivery_mode,
  128.  
  129.                       ))
  130.     #channel.close()
  131.     #connection.close()
  132.  
  133.  
  134. def restore_messages(host, vhost, user, password):
  135.     from os import listdir
  136.     from os.path import isfile, join
  137.     dump_dir = "dump_time_"
  138.     onlyfiles = [f for f in listdir(dump_dir) if isfile(join(dump_dir, f))]
  139.     for file in onlyfiles:
  140.         print_time("restoring file... " + file)
  141.         conn = sqlite3.connect(dump_dir + '/' + file)
  142.         cursor = conn.execute("SELECT *  from dump")
  143.     connection = None
  144.     channel = None
  145.         for row in cursor:
  146.         if connection == None:
  147.         vhost =  row[7]
  148.                 credentials = pika.PlainCredentials(user, password)
  149.         connection = pika.BlockingConnection(
  150.                 pika.ConnectionParameters(host, 5672, vhost, credentials))
  151.         channel = connection.channel()
  152.            
  153.             publish_row(row, channel)
  154.             #res = p.apply_async(publish_row, (row,))
  155.             #try:
  156.             #    res.get(timeout=20)
  157.             #except:
  158.             #    print "We lacked patience and got a multiprocessing.TimeoutError"
  159.  
  160.        
  161.         conn.close()
  162.     connection.close()
  163.         print_time("Restored file... " + file)
  164.  
  165.  
  166.  
  167.  
  168. if __name__ == '__main__':
  169.     print 'Argument list:', str(sys.argv)
  170.     host = sys.argv[1]
  171.     vhost = sys.argv[2]
  172.     user = sys.argv[3]
  173.     password = sys.argv[4]
  174.     operation =  sys.argv[5]
  175.     if operation == 'dump':
  176.         print_time("****** OPERATION ******** DUMP MESSAGES ********** ")
  177.         dump_messages(host, vhost, user, password)
  178.  
  179.     if operation == 'restore':
  180.         restore_messages(host, vhost, user, password)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement