Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import heapq
- from collections import defaultdict
- from concurrent.futures import ThreadPoolExecutor, wait
- from functools import partial
- from pathlib import Path
- from queue import Queue
- def reader(queue, path):
- with path.open("r") as f:
- for line in f:
- queue.put(line)
- queue.put(None)
- def writer(queue, path):
- with path.open("w") as f:
- for line in iter_queue(queue):
- f.write(line)
- def iter_queue(queue):
- while True:
- value = queue.get()
- if value is None:
- break
- yield value
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("paths", nargs="+", type=Path)
- parser.add_argument("-o", "--output", type=Path, required=True)
- ns = parser.parse_args()
- queues = defaultdict(partial(Queue, maxsize=100))
- output_queue = queues[ns.output]
- with ThreadPoolExecutor(max_workers=len(ns.paths)) as pool:
- fs = [pool.submit(reader, queues[path], path) for path in ns.paths]
- fs.append(pool.submit(writer, output_queue, ns.output))
- iterables = [iter_queue(queues[path]) for path in ns.paths]
- for line in heapq.merge(*iterables):
- output_queue.put(line)
- output_queue.put(None)
- wait(fs)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement