Advertisement
zamotivator

Untitled

Feb 12th, 2014
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.63 KB | None | 0 0
  1. #!/usr/bin/env python2.7
  2. # -*- coding: utf-8 -*-
  3.  
  4. import multiprocessing
  5. import multiprocessing.reduction
  6. multiprocessing.allow_connection_pickling()
  7. import sys
  8. from social_graph import *
  9. import time
  10.  
  11. PATH_PATTERN = 'shard.%s'
  12.  
  13. def writer((queue, path)):
  14. try:
  15. sys.stdout.write('Writer: path %s start\n' % (path))
  16. with open(path, 'w') as f:
  17. while True:
  18. bulk = queue.get()
  19. if bulk is None:
  20. queue.task_done()
  21. break
  22. for line in bulk:
  23. f.write(line)
  24. queue.task_done()
  25. sys.stdout.write('Writer: path %s done\n' % path)
  26. return True
  27. except KeyboardInterrupt:
  28. sys.stderr.write('Writer: path %s interrupted\n' % path)
  29. queue.close()
  30. return False
  31. except BaseException as error:
  32. sys.stdout.write('Writer: path %s problem %s\n' % (path, error))
  33. return False
  34.  
  35. def spliter((hash_fun, queues, path)):
  36. try:
  37. sys.stdout.write('Spliter: path %s start\n' % path)
  38. shard_count = len(queues)
  39. bulk = dict((index, []) for index in range(shard_count))
  40. with open(path, 'r') as file_in:
  41. for line in file_in:
  42. try:
  43. index = line.index('\t')
  44. user_id = long(line[:index])
  45. except ValueError:
  46. continue
  47. shard = hash_fun(user_id) % shard_count
  48. bulk[shard].append(line)
  49. if len(bulk[shard]) >= 1024:
  50. queues[shard].put(bulk[shard])
  51. bulk[shard] = []
  52. for shard in sorted(bulk):
  53. if bulk[shard]:
  54. queues[shard].put(bulk[shard])
  55. sys.stdout.write('Spliter: path %s done\n' % path)
  56. except KeyboardInterrupt:
  57. sys.stderr.write('Spliter: path %s interrupted\n' % path)
  58.  
  59. def inputs():
  60. return iter(line.replace('\n', '') for line in sys.stdin)
  61.  
  62. def shards(shard_count):
  63. return iter(PATH_PATTERN % index for index in xrange(shard_count))
  64.  
  65. def writer_tasks(queues):
  66. return iter((queues[index], path) for index, path in enumerate(shards(len(queues))))
  67.  
  68. def reader_tasks(settings, queues):
  69. return iter((settings.hash, queues, path) for path in inputs())
  70.  
  71. def split(settings):
  72. manager = multiprocessing.Manager()
  73. queues = [manager.JoinableQueue() for index in xrange(settings.shard)]
  74. pool_writer = multiprocessing.Pool(settings.shard)
  75. pool_reader = multiprocessing.Pool(settings.parallel)
  76.  
  77. try:
  78. pool_writer.map_async(writer, writer_tasks(queues))
  79. pool_reader.map_async(spliter, reader_tasks(settings, queues))
  80. pool_reader.close()
  81. pool_reader.join()
  82. for q in queues:
  83. q.put(None)
  84. for q in queues:
  85. q.join()
  86. pool_writer.close()
  87. pool_writer.join()
  88. return True
  89. except KeyboardInterrupt:
  90. sys.stderr.write('Split Manager: interrupted\n')
  91. for q in queues:
  92. q.close()
  93. pool_reader.terminate()
  94. pool_writer.terminate()
  95. return False
  96.  
  97.  
  98. def sorter(path):
  99. try:
  100. path_in = path
  101. path_out = '%s.sorted' % path_in
  102. sys.stdout.write('Sorter: path %s start\n' % path)
  103. with open(path_in, 'r') as file_in:
  104. with open(path_out, 'w') as file_out:
  105. subprocess.Popen(['sort', '-n'], stdin=file_in, stdout=file_out).wait()
  106. sys.stdout.write('Sorter: path %s done\n' % path)
  107. subprocess.Popen(['mv', path_out, path_in]).wait()
  108. except KeyboardInterrupt:
  109. sys.stdout.write('Sorter: path %s interrupted\n' % path)
  110.  
  111. def sort(settings):
  112. pool = multiprocessing.Pool(settings.parallel)
  113. try:
  114. pool.map_async(sorter, shards(settings.shard))
  115. pool.close()
  116. pool.join()
  117. except KeyboardInterrupt:
  118. sys.stderr.write('Sort Manager: interrupted\n')
  119. pool.terminate()
  120. return False
  121.  
  122. def parse_args():
  123. import argparse
  124. parser = argparse.ArgumentParser(description='social graph repart')
  125. parser = argparse_group_shard(parser)
  126. parser = argparse_options_parallel(parser)
  127. parser = argparse_options_hash(parser)
  128. settings = parser.parse_args()
  129. settings = argparse_analyze_hash(settings)
  130. return settings
  131.  
  132. def main():
  133. try:
  134. settings = parse_args()
  135. if not split(settings):
  136. sys.exit(1)
  137. if not sort(settings):
  138. sys.exit(1)
  139. except KeyboardInterrupt:
  140. pass
  141.  
  142. if __name__ == '__main__':
  143. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement