Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import gevent
- import time
- import sys
- path_to_read = './input'
- def is_ignored(basename):
- 'if basename in queue >= 10 sec + len(queue) then ignore'
- if 'ignored_dict' not in globals():
- global ignored_dict
- ignored_dict = {}
- if basename in ignored_dict:
- delta = ignored_dict[basename]['delta']
- if delta.next() >= 10+len(watchdir_list):
- return True
- else:
- return False
- else:
- ignored_dict.update({basename: {'delta': iter_delta()}})
- return False
- def watchdog():
- 'eventloop delete basename who in queue more than 5min'
- while True:
- if 'ignored_dict' in globals():
- if ignored_dict:
- for basename, iterator in ignored_dict.items():
- #delete all ignored (300sec)5min old
- if iterator['delta'].next() > 300:
- del ignored_dict[basename]
- gevent.sleep(5)
- raise SystemExit('watchdog')
- def iter_delta():
- 'return time delta as generator'
- timestamp = int((time.time() + 0.5))
- while True:
- current_timestamp = int((time.time() + 0.5))
- yield (current_timestamp - timestamp)
- def watch_dir():
- 'eventloop append basename in queue if not ignored'
- while True:
- if os.path.exists(path_to_read):
- if 'watchdir_list' not in globals():
- global watchdir_list
- watchdir_list = []
- for basename in os.listdir(path_to_read):
- if basename not in watchdir_list:
- if not is_ignored(basename):
- watchdir_list.append(basename)
- else:
- raise SystemExit('watch_dir')
- gevent.sleep(0.1)
- def read_file():
- 'eventloop read first basename from queue'
- while True:
- if 'watchdir_list' not in globals():
- break
- if watchdir_list:
- basename = watchdir_list[0]
- path = os.path.join(path_to_read, basename)
- if os.path.isfile(path):
- try:
- with open(path, 'r') as fd:
- data = fd.read()
- fd.close()
- print basename,'read ok'
- watchdir_list.remove(basename)
- except IOError as err:
- print basename,'read bad'
- watchdir_list.remove(basename)
- else:
- pass
- gevent.sleep(0.1)
- raise SystemExit('read_file')
- def init():
- try:
- g_watch_dir = gevent.spawn(watch_dir)
- g_read_file = gevent.spawn(read_file)
- g_watchdog = gevent.spawn(watchdog)
- #
- g_watch_dir.join()
- g_read_file.join()
- g_watchdog.join()
- except KeyboardInterrupt as err:
- sys.exit(1)
- init()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement