Advertisement
Guest User

Untitled

a guest
Apr 20th, 2017
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.38 KB | None | 0 0
  1. import json
  2. import os
  3.  
  4. import requests
  5.  
  6. from pyrabbit import Client
  7. from requests.auth import HTTPBasicAuth
  8.  
  9. from django.core.management.base import BaseCommand, CommandError
  10. from django.conf import settings
  11.  
  12.  
  13. def connection(host, port, user, password):
  14.     """
  15.    returns client object
  16.    """
  17.     try:
  18.         client = Client('{}:{}'.format(host, port),
  19.         user=user, passwd=password, timeout=5000)
  20.         return client if client.is_alive() else None
  21.     except Exception as e:
  22.         print e
  23.  
  24.  
  25. def get_definitions(legacy_host='node1.rabbitmq.stage.aws.pbinfra.com',
  26.     legacy_port='15672',
  27.     legacy_user='guest', legacy_password='guest'):
  28.     """
  29.    get configuration from legacy
  30.    instance
  31.    """
  32.     response = requests.get('http://{}:{}/api/definitions'.format(legacy_host, legacy_port),
  33.     auth=HTTPBasicAuth(legacy_user, legacy_password))
  34.  
  35.     assert response.status_code == 200
  36.     return response.json()
  37.  
  38.  
  39. def post_definitions(legacy_host,
  40.     legacy_port,
  41.     legacy_user, legacy_password):
  42.     """
  43.    post legacy config
  44.    to the new instance
  45.    """
  46.     data = get_definitions(legacy_host=legacy_host, legacy_port=legacy_port,
  47.     legacy_user=legacy_user, legacy_password=legacy_password)
  48.  
  49.     response = requests.post('http://rabbitmanagement:15672/api/definitions',
  50.     auth=HTTPBasicAuth('guest', 'guest'),
  51.     data=json.dumps(data), headers={'content-type': 'application/json'})
  52.  
  53.     assert response.status_code == 201
  54.  
  55.  
  56. def get_legacy_queues(client, vhost='/'):
  57.     """
  58.    returns a list of queues
  59.    in a legacy cluster vhost
  60.    """
  61.     return client.get_queues(vhost)
  62.  
  63.  
  64. def get_and_publish_messages(client, queues, vhost='/'):
  65.     """
  66.    get messages from the
  67.    queue then publish it
  68.    """
  69.     processed_queues = []
  70.     new_client = connection(host='rabbitmanagement', port=15672,
  71.     user='guest', password='guest')
  72.  
  73.     for element in queues:
  74.         try:
  75.             qname = element.get('name')
  76.  
  77.             # record in case of failures
  78.             processed_queues.append(qname)
  79.  
  80.             message_count = element.get('messages')
  81.             print '+++++++ You are pulling messages from %s +++++++ \n' % qname
  82.  
  83.             qmessage = client.get_messages(vhost=vhost, qname=qname, count=message_count)
  84.             if qmessage is None:
  85.                 continue
  86.             print '+++++++ Finished pulling from %s +++++++ \n' % qname
  87.  
  88.             print '+++++++ Start pushing to the new instance +++++++'
  89.  
  90.             for message in qmessage:
  91.                 new_client.publish(payload=message['payload'],
  92.                 rt_key=message['routing_key'],
  93.                 payload_enc=message['payload_encoding'],
  94.             properties=message['properties'],
  95.             xname=message['exchange'], vhost='/')
  96.  
  97.             print '+++++++ Finished publishing %s message' % len(qmessage)
  98.  
  99.             choice = raw_input('continue[y/n]: ')
  100.             if choice == 'y':
  101.                 continue
  102.  
  103.             elif choice == 'n':
  104.                 break
  105.         # continue in case of exception
  106.         except Exception as e:
  107.             print e
  108.             pass
  109.     print '+++++++ Migrated queues: %s ' % processed_queues
  110.  
  111.  
  112. class Command(BaseCommand):
  113.     help = """Migrate rabbitmq messages from
  114.              legacy instance to new
  115.              python manage.py import_from_rabbitmq --legacy_host <host> --legacy_port <port>
  116.              --legacy_user <user> --lagacy_password <password>"""
  117.  
  118.  
  119.  
  120.     def add_arguments(self, parser):
  121.         parser.add_argument('--legacy_host', nargs='+')
  122.         parser.add_argument('--legacy_port', nargs='+', type=int)
  123.         parser.add_argument('--legacy_user', nargs='+')
  124.         parser.add_argument('--legacy_password', nargs='+')
  125.  
  126.     def handle(self, *args, **options):
  127.         # post config to the new cluster
  128.         host = options.get('legacy_host')[0]
  129.         port = options.get('legacy_port')[0]
  130.         user = options.get('legacy_user')[0]
  131.         password = options.get('legacy_password')[0]
  132.  
  133.         post_definitions(legacy_host=host, legacy_port=port,
  134.         legacy_user=user, legacy_password=password)
  135.  
  136.         legacy_client = connection(host=host, port=port,
  137.         user=user, password=password)
  138.  
  139.         legacy_queues = get_legacy_queues(client=legacy_client)
  140.         get_and_publish_messages(client=legacy_client, queues=legacy_queues, vhost='/')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement