Advertisement
Guest User

Untitled

a guest
Apr 23rd, 2017
57
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.40 KB | None | 0 0
  1. #!/usr/local/bin/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. import logging
  5. import logging.config
  6. import queue
  7. import signal
  8. import socket
  9. import sys
  10. import threading
  11. import time
  12.  
  13.  
  14. MONITOR_HOST_A = '89.75.112.80'
  15. MONITOR_PORT_A = 6005
  16.  
  17. MONITOR_HOST_B = '89.75.112.80'
  18. MONITOR_PORT_B = 6006
  19.  
  20. NVR_HOST_A = '192.0.0.123'
  21. NVR_PORT_A = 10001
  22.  
  23. NVR_HOST_B = '192.0.0.123'
  24. NVR_PORT_B = 10002
  25.  
  26.  
  27. TIMEOUT = 30
  28.  
  29.  
  30. LOG_CONF = {
  31.     'version': 1,
  32.  
  33.     'formatters': {
  34.         'void': {
  35.             'format': ''
  36.         },
  37.         'standard': {
  38.             'format': ':: %(asctime)s [%(levelname)s] %(message)s'
  39.         },
  40.     },
  41.     'handlers': {
  42.         'default': {
  43.             'level':'DEBUG',
  44.             'class':'logging.StreamHandler',
  45.             'formatter': 'standard',
  46.             'stream': 'ext://sys.stdout'
  47.         },
  48.         'error': {
  49.             'level':'DEBUG',
  50.             'class': 'logging.handlers.RotatingFileHandler',
  51.             'formatter': 'standard',
  52.             'filename': 'error.log',
  53.             'maxBytes': 10485760,
  54.             'backupCount': 20,
  55.             'encoding': 'utf8'
  56.         },
  57.     },
  58.     'loggers': {
  59.         '': {
  60.             'handlers': ['default', 'error'],
  61.             'level': 'DEBUG'
  62.         },
  63.     }
  64. }
  65.  
  66.  
  67. def buffered_readlines(pull_next_chunk, buf_size=4096):
  68.   """
  69.  pull_next_chunk is callable that should accept one positional argument max_len,
  70.  i.e. socket.recv or file().read and returns string of up to max_len long or
  71.  empty one when nothing left to read.
  72.  
  73.  >>> for line in buffered_readlines(socket.recv, 16384):
  74.  ...   print line
  75.    ...
  76.  >>> # the following code won't read whole file into memory
  77.  ... # before splitting it into lines like .readlines method
  78.  ... # of file does. Also it won't block until FIFO-file is closed
  79.  ...
  80.  >>> for line in buffered_readlines(open('huge_file').read):
  81.  ...   # process it on per-line basis
  82.        ...
  83.  >>>
  84.  """
  85.   chunks = []
  86.   while True:
  87.     chunk = pull_next_chunk(buf_size)
  88.     if not chunk:
  89.       if chunks:
  90.         yield b''.join(chunks)
  91.       break
  92.     if not b'\n' in chunk:
  93.       chunks.append(chunk)
  94.       continue
  95.     chunk = chunk.split(b'\n')
  96.     if chunks:
  97.       yield b''.join(chunks + [chunk[0]])
  98.     else:
  99.       yield chunk[0]
  100.     for line in chunk[1:-1]:
  101.       yield line
  102.     if chunk[-1]:
  103.       chunks = [chunk[-1]]
  104.     else:
  105.       chunks = []
  106.  
  107.  
  108. class WorkerThread(threading.Thread):
  109.     """A thread that will call do_work() method in a loop until terminated"""
  110.     def __init__(self):
  111.         super().__init__()
  112.         self._terminate = False
  113.  
  114.     def stop(self):
  115.         """Terminate the thread."""
  116.         logging.debug('%s stopping...', self)
  117.         self._terminate = True
  118.  
  119.     def do_work(self):
  120.         """Override this method to provide functionality."""
  121.         pass
  122.  
  123.     def run(self):
  124.         logging.debug('%s started', self)
  125.         while not self._terminate:
  126.             self.do_work()
  127.         logging.debug('%s stopped', self)
  128.  
  129.  
  130. class InputThread(WorkerThread):
  131.     """A thread that connects to the monitor and handles incoming data."""
  132.     def __init__(self, host, port, output):
  133.         super().__init__()
  134.         self._host = host
  135.         self._port = port
  136.         self._sock = None
  137.         self._addr = (host, port)
  138.         self._output = output
  139.  
  140.     def do_work(self):
  141.         logging.info('%s connecting to the monitor: %s port %s', self, self._host, self._port)
  142.         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  143.         self._sock.settimeout(TIMEOUT)
  144.         try:
  145.             self._sock.connect(self._addr)
  146.         except socket.timeout as ex:
  147.             logging.warning('%s connection failed, retrying in %s s', self, TIMEOUT)
  148.             self._sock.close()
  149.             time.sleep(TIMEOUT)
  150.             return
  151.         except Exception as ex:
  152.             logging.exception(ex)
  153.             self._sock.close()
  154.             time.sleep(TIMEOUT)
  155.             return
  156.         logging.info('%s connected', self)
  157.         try:
  158.             while not self._terminate:
  159.                 try:
  160.                     for line in buffered_readlines(self._sock.recv, 512):
  161.                         #print(line.strip()+b'\r\n')
  162.                         self._output.send(line.strip()+b'\r\n')
  163.                 except socket.timeout:
  164.                     pass
  165.         except Exception as ex:
  166.             logging.exception(ex)
  167.             logging.warning('%s receiving data failed, reconnecting in %s s', self, TIMEOUT)
  168.             self._sock.close()
  169.             time.sleep(TIMEOUT)
  170.             return
  171.         self._sock.close()
  172.         logging.info('%s disconnected', self)
  173.  
  174.  
  175. class OutputThread(WorkerThread):
  176.     """A thread that connects to the recorder and handles outgoing data."""
  177.     def __init__(self, host, port):
  178.         super().__init__()
  179.         self._host = host
  180.         self._port = port
  181.         self._sock = None
  182.  
  183.         self._addr = (host, port)
  184.         self._connected = False
  185.         self._queue = queue.Queue()
  186.  
  187.     def send(self, data):
  188.         """send data to the recorder."""
  189.         if self._connected:
  190.             self._queue.put(data)
  191.         else:
  192.             logging.warning('%s not connected to the recorder, discarding data: %s', self, data)
  193.  
  194.     def do_work(self):
  195.         logging.info('%s connecting to the recorder: %s port %s', self, self._host, self._port)
  196.         self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  197.         self._sock.settimeout(TIMEOUT)
  198.         try:
  199.             self._sock.connect(self._addr)
  200.         except socket.timeout as ex:
  201.             logging.warning('%s connection failed, retrying in %s s', self, TIMEOUT)
  202.             self._sock.close()
  203.             time.sleep(TIMEOUT)
  204.             return
  205.         except Exception as ex:
  206.             logging.exception(ex)
  207.             self._sock.close()
  208.             time.sleep(TIMEOUT)
  209.             return
  210.         logging.info('%s connected', self)
  211.         self._connected = True
  212.         while not self._terminate:
  213.             try:
  214.                 data = self._queue.get(block=True, timeout=1)
  215.             except queue.Empty:
  216.                 continue
  217.             try:
  218.                 self._sock.send(data)
  219.             except Exception as ex:
  220.                 logging.exception(ex)
  221.                 logging.warning('%s sending data failed, reconnecting in %s s', self, TIMEOUT)
  222.                 self._connected = False
  223.                 self._sock.close()
  224.                 time.sleep(TIMEOUT)
  225.                 return
  226.         self._connected = False
  227.         self._sock.close()
  228.         logging.info('%s disconnected', self)
  229.  
  230.  
  231. class Main():
  232.     """Main POS data processing class."""
  233.     def __init__(self):
  234.         self._outa = OutputThread(NVR_HOST_A, NVR_PORT_A)
  235.         self._outb = OutputThread(NVR_HOST_B, NVR_PORT_B)
  236.  
  237.         self._ina = InputThread(MONITOR_HOST_A, MONITOR_PORT_A, self._outa)
  238.         self._inb = InputThread(MONITOR_HOST_B, MONITOR_PORT_B, self._outb)
  239.  
  240.         self._terminate = False
  241.         signal.signal(signal.SIGTERM, self._signal_term_handler)
  242.         signal.signal(signal.SIGINT, self._signal_int_handler)
  243.  
  244.     def start(self):
  245.         """Start POS data processing, a blocking method."""
  246.         logging.info('%s starting...', self)
  247.         self._outa.start()
  248.         self._outb.start()
  249.  
  250.         self._ina.start()
  251.         self._inb.start()
  252.  
  253.         logging.info('%s started', self)
  254.  
  255.         while not self._terminate:
  256.             time.sleep(TIMEOUT)
  257.  
  258.         logging.info('%s stopped', self)
  259.  
  260.     def stop(self):
  261.         """Stop POS data processing, exit start() method."""
  262.         logging.info('%s stopping...', self)
  263.  
  264.         self._terminate = True
  265.  
  266.         self._ina.stop()
  267.         self._inb.stop()
  268.  
  269.         self._outa.stop()
  270.         self._outb.stop()
  271.  
  272.     def _signal_term_handler(self, sig, frm):
  273.         logging.info('%s got SIGTERM', self)
  274.         self.stop()
  275.  
  276.     def _signal_int_handler(self, sig, frm):
  277.         logging.info('%s got SIGINT', self)
  278.         self.stop()
  279.  
  280.  
  281. if __name__ == '__main__':
  282.     logging.config.dictConfig(LOG_CONF)
  283.  
  284.     logging.info('**********************************************')
  285.     logging.info('*** POS Monitor 0417 (C) 2017 Kamil Okrasa ***')
  286.     logging.info('*** CTRL+C to stop                         ***')
  287.     logging.info('**********************************************')
  288.  
  289.     MAIN = Main()
  290.     MAIN.start()
  291.  
  292.     sys.exit(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement