Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class LogstashSender:
- def __init__(self, ip, port, poolcount=10):
- self.server_address = (ip, port)
- self.tcpsocklist = []
- self.poolcount = poolcount
- def __enter__(self):
- for i in range(0, self.poolcount):
- self.tcpsocklist.append(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
- self.tcpsocklist[i].connect(self.server_address)
- return self
- def send_data(self, data):
- msg = ujson.dumps(data) + "\n"
- itertools.cycle(self.tcpsocklist).__next__().sendall(msg.encode())
- def __exit__(self, exc_type, exc_val, exc_tb):
- for sock in self.tcpsocklist:
- sock.close()
- @staticmethod
- def upload_queue_to_remote_logstash(queue: Queue, ip: str, port: int, report_type: str, send_interval=0.01):
- with LogstashSender(ip, port) as lssender:
- while True:
- if not queue.empty():
- report = queue.get_nowait()
- # print(
- logger.info('Send to ES3 - {}'.format(report))
- logger.info('Current queue size - {}'.format(queue.qsize()))
- try:
- if 'timestamp' not in report:
- report['timestamp'] = utcnow_iso_timestamp()
- report['queue_size'] = queue.qsize()
- report['type'] = report_type
- lssender.send_data(report)
- except Exception as e:
- print(str(e))
- raise e
- queue.put_nowait(report)
- else:
- time.sleep(send_interval)
Add Comment
Please, Sign In to add comment