Advertisement
Guest User

file read queue

a guest
Apr 23rd, 2015
325
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.31 KB | None | 0 0
  1. import os
  2. import gevent
  3. import time
  4. import sys
  5.  
  6. path_to_read = './input'
  7.  
  8. def is_ignored(basename):
  9.     'if basename in queue >= 10 sec + len(queue) then ignore'
  10.     if 'ignored_dict' not in globals():
  11.         global ignored_dict
  12.         ignored_dict = {}
  13.     if basename in ignored_dict:
  14.         delta = ignored_dict[basename]['delta']
  15.         if delta.next() >= 10+len(watchdir_list):
  16.             return True
  17.         else:
  18.             return False
  19.     else:
  20.         ignored_dict.update({basename: {'delta': iter_delta()}})
  21.         return False
  22.  
  23.  
  24. def watchdog():
  25.     'eventloop delete basename who in queue more than 5min'
  26.     while True:
  27.         if 'ignored_dict' in globals():
  28.             if ignored_dict:
  29.                 for basename, iterator in ignored_dict.items():
  30.                     #delete all ignored (300sec)5min old
  31.                     if iterator['delta'].next() > 300:
  32.                         del ignored_dict[basename]
  33.         gevent.sleep(5)
  34.     raise SystemExit('watchdog')
  35.  
  36.  
  37. def iter_delta():
  38.     'return time delta as generator'
  39.     timestamp = int((time.time() + 0.5))
  40.     while True:
  41.         current_timestamp = int((time.time() + 0.5))
  42.         yield (current_timestamp - timestamp)
  43.  
  44.  
  45. def watch_dir():
  46.     'eventloop append basename in queue if not ignored'
  47.     while True:
  48.         if os.path.exists(path_to_read):
  49.             if 'watchdir_list' not in globals():
  50.                 global watchdir_list
  51.                 watchdir_list = []
  52.             for basename in os.listdir(path_to_read):
  53.                 if basename not in watchdir_list:
  54.                     if not is_ignored(basename):
  55.                         watchdir_list.append(basename)
  56.         else:
  57.             raise SystemExit('watch_dir')
  58.         gevent.sleep(0.1)
  59.  
  60.  
  61. def read_file():
  62.     'eventloop read first basename from queue'
  63.     while True:
  64.         if 'watchdir_list' not in globals():
  65.             break
  66.         if watchdir_list:
  67.             basename = watchdir_list[0]
  68.             path = os.path.join(path_to_read, basename)
  69.             if os.path.isfile(path):
  70.                 try:
  71.                     with open(path, 'r') as fd:
  72.                         data = fd.read()
  73.                     fd.close()
  74.                     print basename,'read ok'
  75.                     watchdir_list.remove(basename)
  76.                 except IOError as err:
  77.                     print basename,'read bad'
  78.                     watchdir_list.remove(basename)
  79.             else:
  80.                 pass
  81.         gevent.sleep(0.1)
  82.     raise SystemExit('read_file')
  83.  
  84.  
  85. def init():
  86.     try:
  87.         g_watch_dir = gevent.spawn(watch_dir)
  88.         g_read_file = gevent.spawn(read_file)
  89.         g_watchdog = gevent.spawn(watchdog)
  90.         #
  91.         g_watch_dir.join()
  92.         g_read_file.join()
  93.         g_watchdog.join()
  94.     except KeyboardInterrupt as err:
  95.         sys.exit(1)
  96.  
  97.  
  98. init()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement