Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import shutil
- import argparse
- #this script lets you move kafka partitions on the same broker between log-dirs. The broker must be shutdown for this to work!
- #sudo python3 partition_mover.py --src /tmp/kafka-logs/ --dest /tmp/kafka-logs2/ --topic reddit_dev_comments_error_fh --partition 0
- parser = argparse.ArgumentParser(description='Intra-Broker Kafka Partition Mover')
- parser.add_argument('--src', help='The Kafka Topic Log-Dir you want to move a partition from', required=True)
- parser.add_argument('--dest', help='The Kafka Topic Log-Dir you want to move a partition to', required=True)
- parser.add_argument('--topic', help='The Kafka Topic Name', required=True)
- parser.add_argument('--partition', help='The Kafka Topic Partition you want to move', required=True)
- parser.add_argument('--backup_dir', help='The directory you want to backup files to, default is /tmp/', required=False, default="/tmp/")
- args = parser.parse_args()
- #check that the paths are valid
- if args.dest.endswith("/"):
- pass
- else:
- args.dest = args.dest+"/"
- if args.src.endswith("/"):
- pass
- else:
- args.src = args.src+"/"
- if args.backup_dir.endswith("/"):
- pass
- else:
- args.backup_dir = args.backup_dir+"/"
- print("Copying recovery and replication files from source log-dir")
- shutil.copy(args.src+"recovery-point-offset-checkpoint", args.backup_dir)
- shutil.copy(args.src+"replication-offset-checkpoint", args.backup_dir)
- print("Done copying files")
- print("Moving Topic Partition from Source to Destition")
- shutil.move(args.src+args.topic+"-"+args.partition, args.dest)
- print("Done moving topic Partition")
- print("Collecting offset metadata and generating new offset files")
- replication_meta = ""
- recovery_meta = ""
- count = 0
- with open(args.src+"recovery-point-offset-checkpoint", "r") as input:
- with open(args.src+"recovery-point-offset-checkpoint.new", "w") as output:
- for line in input:
- count+= 1
- if count == 2:
- num = int(line) - 1
- output.write(str(num)+"\n")
- elif args.topic+" "+args.partition in line:
- recovery_meta = line
- else:
- output.write(line)
- input.close()
- output.close()
- count = 0
- with open(args.src+"replication-offset-checkpoint", "r") as input:
- with open(args.src+"replication-offset-checkpoint.new", "w") as output:
- for line in input:
- count+= 1
- if count == 2:
- num = int(line) - 1
- output.write(str(num)+"\n")
- elif args.topic+" "+args.partition in line:
- replication_meta = line
- else:
- output.write(line)
- input.close()
- output.close()
- print("replication meta " + replication_meta)
- print("recovery meta" +recovery_meta)
- print("Done collecting meta and generating new offset files for source directory")
- print("Generating new offset files for dest directory")
- count = 0
- with open(args.dest+"recovery-point-offset-checkpoint", "r") as input:
- with open(args.dest+"recovery-point-offset-checkpoint.new", "w") as output:
- for line in input:
- count+= 1
- if count == 2:
- num = int(line) + 1
- output.write(str(num)+"\n")
- else:
- output.write(line)
- output.write(recovery_meta)
- input.close()
- output.close()
- count = 0
- with open(args.dest+"replication-offset-checkpoint", "r") as input:
- with open(args.dest+"replication-offset-checkpoint.new", "w") as output:
- for line in input:
- count+= 1
- if count == 2:
- num = int(line) + 1
- output.write(str(num)+"\n")
- else:
- output.write(line)
- output.write(replication_meta)
- input.close()
- output.close()
- print("Done generating new offset files for dest directory")
- print("Replacing old offset files in dest and source directory")
- shutil.move(args.src+"replication-offset-checkpoint.new", args.src+"replication-offset-checkpoint")
- shutil.move(args.src+"recovery-point-offset-checkpoint.new", args.src+"recovery-point-offset-checkpoint")
- shutil.move(args.dest+"replication-offset-checkpoint.new", args.dest+"replication-offset-checkpoint")
- shutil.move(args.dest+"recovery-point-offset-checkpoint.new", args.dest+"recovery-point-offset-checkpoint")
- print("###DONE###")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement