Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import os
- import requests
- from pyrabbit import Client
- from requests.auth import HTTPBasicAuth
- from django.core.management.base import BaseCommand, CommandError
- from django.conf import settings
- def connection(host, port, user, password):
- """
- returns client object
- """
- try:
- client = Client('{}:{}'.format(host, port),
- user=user, passwd=password, timeout=5000)
- return client if client.is_alive() else None
- except Exception as e:
- print e
- def get_definitions(legacy_host='node1.rabbitmq.stage.aws.pbinfra.com',
- legacy_port='15672',
- legacy_user='guest', legacy_password='guest'):
- """
- get configuration from legacy
- instance
- """
- response = requests.get('http://{}:{}/api/definitions'.format(legacy_host, legacy_port),
- auth=HTTPBasicAuth(legacy_user, legacy_password))
- assert response.status_code == 200
- return response.json()
- def post_definitions(legacy_host,
- legacy_port,
- legacy_user, legacy_password):
- """
- post legacy config
- to the new instance
- """
- data = get_definitions(legacy_host=legacy_host, legacy_port=legacy_port,
- legacy_user=legacy_user, legacy_password=legacy_password)
- response = requests.post('http://rabbitmanagement:15672/api/definitions',
- auth=HTTPBasicAuth('guest', 'guest'),
- data=json.dumps(data), headers={'content-type': 'application/json'})
- assert response.status_code == 201
- def get_legacy_queues(client, vhost='/'):
- """
- returns a list of queues
- in a legacy cluster vhost
- """
- return client.get_queues(vhost)
- def get_and_publish_messages(client, queues, vhost='/'):
- """
- get messages from the
- queue then publish it
- """
- processed_queues = []
- new_client = connection(host='rabbitmanagement', port=15672,
- user='guest', password='guest')
- for element in queues:
- try:
- qname = element.get('name')
- # record in case of failures
- processed_queues.append(qname)
- message_count = element.get('messages')
- print '+++++++ You are pulling messages from %s +++++++ \n' % qname
- qmessage = client.get_messages(vhost=vhost, qname=qname, count=message_count)
- if qmessage is None:
- continue
- print '+++++++ Finished pulling from %s +++++++ \n' % qname
- print '+++++++ Start pushing to the new instance +++++++'
- for message in qmessage:
- new_client.publish(payload=message['payload'],
- rt_key=message['routing_key'],
- payload_enc=message['payload_encoding'],
- properties=message['properties'],
- xname=message['exchange'], vhost='/')
- print '+++++++ Finished publishing %s message' % len(qmessage)
- choice = raw_input('continue[y/n]: ')
- if choice == 'y':
- continue
- elif choice == 'n':
- break
- # continue in case of exception
- except Exception as e:
- print e
- pass
- print '+++++++ Migrated queues: %s ' % processed_queues
- class Command(BaseCommand):
- help = """Migrate rabbitmq messages from
- legacy instance to new
- python manage.py import_from_rabbitmq --legacy_host <host> --legacy_port <port>
- --legacy_user <user> --lagacy_password <password>"""
- def add_arguments(self, parser):
- parser.add_argument('--legacy_host', nargs='+')
- parser.add_argument('--legacy_port', nargs='+', type=int)
- parser.add_argument('--legacy_user', nargs='+')
- parser.add_argument('--legacy_password', nargs='+')
- def handle(self, *args, **options):
- # post config to the new cluster
- host = options.get('legacy_host')[0]
- port = options.get('legacy_port')[0]
- user = options.get('legacy_user')[0]
- password = options.get('legacy_password')[0]
- post_definitions(legacy_host=host, legacy_port=port,
- legacy_user=user, legacy_password=password)
- legacy_client = connection(host=host, port=port,
- user=user, password=password)
- legacy_queues = get_legacy_queues(client=legacy_client)
- get_and_publish_messages(client=legacy_client, queues=legacy_queues, vhost='/')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement