Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/local/bin/python
- # -*- coding: utf-8 -*-
- import logging
- import logging.config
- import queue
- import signal
- import socket
- import sys
- import threading
- import time
- MONITOR_HOST_A = '89.75.112.80'
- MONITOR_PORT_A = 6005
- MONITOR_HOST_B = '89.75.112.80'
- MONITOR_PORT_B = 6006
- NVR_HOST_A = '192.0.0.123'
- NVR_PORT_A = 10001
- NVR_HOST_B = '192.0.0.123'
- NVR_PORT_B = 10002
- TIMEOUT = 30
- LOG_CONF = {
- 'version': 1,
- 'formatters': {
- 'void': {
- 'format': ''
- },
- 'standard': {
- 'format': ':: %(asctime)s [%(levelname)s] %(message)s'
- },
- },
- 'handlers': {
- 'default': {
- 'level':'DEBUG',
- 'class':'logging.StreamHandler',
- 'formatter': 'standard',
- 'stream': 'ext://sys.stdout'
- },
- 'error': {
- 'level':'DEBUG',
- 'class': 'logging.handlers.RotatingFileHandler',
- 'formatter': 'standard',
- 'filename': 'error.log',
- 'maxBytes': 10485760,
- 'backupCount': 20,
- 'encoding': 'utf8'
- },
- },
- 'loggers': {
- '': {
- 'handlers': ['default', 'error'],
- 'level': 'DEBUG'
- },
- }
- }
- def buffered_readlines(pull_next_chunk, buf_size=4096):
- """
- pull_next_chunk is callable that should accept one positional argument max_len,
- i.e. socket.recv or file().read and returns string of up to max_len long or
- empty one when nothing left to read.
- >>> for line in buffered_readlines(socket.recv, 16384):
- ... print line
- ...
- >>> # the following code won't read whole file into memory
- ... # before splitting it into lines like .readlines method
- ... # of file does. Also it won't block until FIFO-file is closed
- ...
- >>> for line in buffered_readlines(open('huge_file').read):
- ... # process it on per-line basis
- ...
- >>>
- """
- chunks = []
- while True:
- chunk = pull_next_chunk(buf_size)
- if not chunk:
- if chunks:
- yield b''.join(chunks)
- break
- if not b'\n' in chunk:
- chunks.append(chunk)
- continue
- chunk = chunk.split(b'\n')
- if chunks:
- yield b''.join(chunks + [chunk[0]])
- else:
- yield chunk[0]
- for line in chunk[1:-1]:
- yield line
- if chunk[-1]:
- chunks = [chunk[-1]]
- else:
- chunks = []
- class WorkerThread(threading.Thread):
- """A thread that will call do_work() method in a loop until terminated"""
- def __init__(self):
- super().__init__()
- self._terminate = False
- def stop(self):
- """Terminate the thread."""
- logging.debug('%s stopping...', self)
- self._terminate = True
- def do_work(self):
- """Override this method to provide functionality."""
- pass
- def run(self):
- logging.debug('%s started', self)
- while not self._terminate:
- self.do_work()
- logging.debug('%s stopped', self)
- class InputThread(WorkerThread):
- """A thread that connects to the monitor and handles incoming data."""
- def __init__(self, host, port, output):
- super().__init__()
- self._host = host
- self._port = port
- self._sock = None
- self._addr = (host, port)
- self._output = output
- def do_work(self):
- logging.info('%s connecting to the monitor: %s port %s', self, self._host, self._port)
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.settimeout(TIMEOUT)
- try:
- self._sock.connect(self._addr)
- except socket.timeout as ex:
- logging.warning('%s connection failed, retrying in %s s', self, TIMEOUT)
- self._sock.close()
- time.sleep(TIMEOUT)
- return
- except Exception as ex:
- logging.exception(ex)
- self._sock.close()
- time.sleep(TIMEOUT)
- return
- logging.info('%s connected', self)
- try:
- while not self._terminate:
- try:
- for line in buffered_readlines(self._sock.recv, 512):
- #print(line.strip()+b'\r\n')
- self._output.send(line.strip()+b'\r\n')
- except socket.timeout:
- pass
- except Exception as ex:
- logging.exception(ex)
- logging.warning('%s receiving data failed, reconnecting in %s s', self, TIMEOUT)
- self._sock.close()
- time.sleep(TIMEOUT)
- return
- self._sock.close()
- logging.info('%s disconnected', self)
- class OutputThread(WorkerThread):
- """A thread that connects to the recorder and handles outgoing data."""
- def __init__(self, host, port):
- super().__init__()
- self._host = host
- self._port = port
- self._sock = None
- self._addr = (host, port)
- self._connected = False
- self._queue = queue.Queue()
- def send(self, data):
- """send data to the recorder."""
- if self._connected:
- self._queue.put(data)
- else:
- logging.warning('%s not connected to the recorder, discarding data: %s', self, data)
- def do_work(self):
- logging.info('%s connecting to the recorder: %s port %s', self, self._host, self._port)
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self._sock.settimeout(TIMEOUT)
- try:
- self._sock.connect(self._addr)
- except socket.timeout as ex:
- logging.warning('%s connection failed, retrying in %s s', self, TIMEOUT)
- self._sock.close()
- time.sleep(TIMEOUT)
- return
- except Exception as ex:
- logging.exception(ex)
- self._sock.close()
- time.sleep(TIMEOUT)
- return
- logging.info('%s connected', self)
- self._connected = True
- while not self._terminate:
- try:
- data = self._queue.get(block=True, timeout=1)
- except queue.Empty:
- continue
- try:
- self._sock.send(data)
- except Exception as ex:
- logging.exception(ex)
- logging.warning('%s sending data failed, reconnecting in %s s', self, TIMEOUT)
- self._connected = False
- self._sock.close()
- time.sleep(TIMEOUT)
- return
- self._connected = False
- self._sock.close()
- logging.info('%s disconnected', self)
- class Main():
- """Main POS data processing class."""
- def __init__(self):
- self._outa = OutputThread(NVR_HOST_A, NVR_PORT_A)
- self._outb = OutputThread(NVR_HOST_B, NVR_PORT_B)
- self._ina = InputThread(MONITOR_HOST_A, MONITOR_PORT_A, self._outa)
- self._inb = InputThread(MONITOR_HOST_B, MONITOR_PORT_B, self._outb)
- self._terminate = False
- signal.signal(signal.SIGTERM, self._signal_term_handler)
- signal.signal(signal.SIGINT, self._signal_int_handler)
- def start(self):
- """Start POS data processing, a blocking method."""
- logging.info('%s starting...', self)
- self._outa.start()
- self._outb.start()
- self._ina.start()
- self._inb.start()
- logging.info('%s started', self)
- while not self._terminate:
- time.sleep(TIMEOUT)
- logging.info('%s stopped', self)
- def stop(self):
- """Stop POS data processing, exit start() method."""
- logging.info('%s stopping...', self)
- self._terminate = True
- self._ina.stop()
- self._inb.stop()
- self._outa.stop()
- self._outb.stop()
- def _signal_term_handler(self, sig, frm):
- logging.info('%s got SIGTERM', self)
- self.stop()
- def _signal_int_handler(self, sig, frm):
- logging.info('%s got SIGINT', self)
- self.stop()
- if __name__ == '__main__':
- logging.config.dictConfig(LOG_CONF)
- logging.info('**********************************************')
- logging.info('*** POS Monitor 0417 (C) 2017 Kamil Okrasa ***')
- logging.info('*** CTRL+C to stop ***')
- logging.info('**********************************************')
- MAIN = Main()
- MAIN.start()
- sys.exit(0)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement