Advertisement
josephxsxn

partition_mover.py

May 10th, 2018
177
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 4.25 KB | None | 0 0
  1. import sys
  2. import shutil
  3. import argparse
  4.  
  5. #this script lets you move kafka partitions on the same broker between log-dirs. The broker must  be shutdown for this to work!
  6. #sudo python3 partition_mover.py --src /tmp/kafka-logs/ --dest /tmp/kafka-logs2/ --topic reddit_dev_comments_error_fh --partition 0
  7. parser = argparse.ArgumentParser(description='Intra-Broker Kafka Partition Mover')
  8. parser.add_argument('--src', help='The Kafka Topic Log-Dir you want to move a partition from', required=True)
  9. parser.add_argument('--dest', help='The Kafka Topic Log-Dir you want to move a partition to', required=True)
  10. parser.add_argument('--topic', help='The Kafka Topic Name', required=True)
  11. parser.add_argument('--partition', help='The Kafka Topic Partition you want to move', required=True)
  12. parser.add_argument('--backup_dir', help='The directory you want to backup files to, default is /tmp/', required=False, default="/tmp/")
  13.  
  14. args = parser.parse_args()
  15.  
  16. #check that the paths are valid
  17. if args.dest.endswith("/"):
  18.    pass
  19. else:
  20.    args.dest = args.dest+"/"
  21.  
  22. if args.src.endswith("/"):
  23.    pass
  24. else:
  25.    args.src = args.src+"/"
  26.  
  27. if args.backup_dir.endswith("/"):
  28.    pass
  29. else:
  30.    args.backup_dir = args.backup_dir+"/"
  31.  
  32. print("Copying recovery and replication files from source log-dir")
  33. shutil.copy(args.src+"recovery-point-offset-checkpoint", args.backup_dir)
  34. shutil.copy(args.src+"replication-offset-checkpoint", args.backup_dir)
  35. print("Done copying files")
  36.  
  37. print("Moving Topic Partition from Source to Destition")
  38. shutil.move(args.src+args.topic+"-"+args.partition, args.dest)
  39. print("Done moving topic Partition")
  40.  
  41. print("Collecting offset metadata and generating new offset files")
  42. replication_meta = ""
  43. recovery_meta = ""
  44.  
  45. count = 0
  46. with open(args.src+"recovery-point-offset-checkpoint", "r") as input:
  47.    with open(args.src+"recovery-point-offset-checkpoint.new", "w") as output:
  48.       for line in input:
  49.          count+= 1
  50.          if count == 2:
  51.             num = int(line) - 1
  52.             output.write(str(num)+"\n")
  53.          elif args.topic+" "+args.partition in line:
  54.             recovery_meta = line
  55.          else:
  56.             output.write(line)
  57. input.close()
  58. output.close()
  59.  
  60. count = 0
  61. with open(args.src+"replication-offset-checkpoint", "r") as input:
  62.    with open(args.src+"replication-offset-checkpoint.new", "w") as output:
  63.       for line in input:
  64.          count+= 1
  65.          if count == 2:
  66.             num = int(line) - 1
  67.             output.write(str(num)+"\n")
  68.          elif args.topic+" "+args.partition in line:
  69.             replication_meta = line
  70.          else:
  71.             output.write(line)
  72. input.close()
  73. output.close()
  74. print("replication meta " + replication_meta)
  75. print("recovery meta" +recovery_meta)
  76. print("Done collecting meta and generating new offset files for source directory")
  77.  
  78. print("Generating new offset files for dest directory")
  79.  
  80. count = 0
  81. with open(args.dest+"recovery-point-offset-checkpoint", "r") as input:
  82.    with open(args.dest+"recovery-point-offset-checkpoint.new", "w") as output:
  83.       for line in input:
  84.          count+= 1
  85.          if count == 2:
  86.             num = int(line) + 1
  87.             output.write(str(num)+"\n")
  88.          else:
  89.             output.write(line)
  90.       output.write(recovery_meta)
  91. input.close()
  92. output.close()
  93.  
  94. count = 0
  95. with open(args.dest+"replication-offset-checkpoint", "r") as input:
  96.    with open(args.dest+"replication-offset-checkpoint.new", "w") as output:
  97.       for line in input:
  98.          count+= 1
  99.          if count == 2:
  100.             num = int(line) + 1
  101.             output.write(str(num)+"\n")
  102.          else:
  103.             output.write(line)
  104.       output.write(replication_meta)
  105. input.close()
  106. output.close()
  107.  
  108. print("Done generating new offset files for dest directory")
  109. print("Replacing old offset files in dest and source directory")
  110. shutil.move(args.src+"replication-offset-checkpoint.new", args.src+"replication-offset-checkpoint")
  111. shutil.move(args.src+"recovery-point-offset-checkpoint.new", args.src+"recovery-point-offset-checkpoint")
  112. shutil.move(args.dest+"replication-offset-checkpoint.new", args.dest+"replication-offset-checkpoint")
  113. shutil.move(args.dest+"recovery-point-offset-checkpoint.new", args.dest+"recovery-point-offset-checkpoint")
  114.  
  115.  
  116. print("###DONE###")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement