Advertisement
Guest User

Untitled

a guest
Apr 1st, 2020
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.34 KB | None | 0 0
  1. import argparse
  2. import heapq
  3. from collections import defaultdict
  4. from concurrent.futures import ThreadPoolExecutor, wait
  5. from functools import partial
  6. from pathlib import Path
  7. from queue import Queue
  8.  
  9.  
  10. def reader(queue, path):
  11.     with path.open("r") as f:
  12.         for line in f:
  13.             queue.put(line)
  14.     queue.put(None)
  15.  
  16.  
  17. def writer(queue, path):
  18.     with path.open("w") as f:
  19.         for line in iter_queue(queue):
  20.             f.write(line)
  21.  
  22.  
  23. def iter_queue(queue):
  24.     while True:
  25.         value = queue.get()
  26.         if value is None:
  27.             break
  28.         yield value
  29.  
  30.  
  31. def main():
  32.     parser = argparse.ArgumentParser()
  33.     parser.add_argument("paths", nargs="+", type=Path)
  34.     parser.add_argument("-o", "--output", type=Path, required=True)
  35.     ns = parser.parse_args()
  36.     queues = defaultdict(partial(Queue, maxsize=100))
  37.     output_queue = queues[ns.output]
  38.     with ThreadPoolExecutor(max_workers=len(ns.paths)) as pool:
  39.         fs = [pool.submit(reader, queues[path], path) for path in ns.paths]
  40.         fs.append(pool.submit(writer, output_queue, ns.output))
  41.         iterables = [iter_queue(queues[path]) for path in ns.paths]
  42.         for line in heapq.merge(*iterables):
  43.             output_queue.put(line)
  44.         output_queue.put(None)
  45.         wait(fs)
  46.  
  47.  
  48. if __name__ == "__main__":
  49.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement