Guest User

Untitled

a guest
Nov 18th, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.66 KB | None | 0 0
  1. class LogstashSender:
  2. def __init__(self, ip, port, poolcount=10):
  3.  
  4. self.server_address = (ip, port)
  5. self.tcpsocklist = []
  6. self.poolcount = poolcount
  7.  
  8. def __enter__(self):
  9. for i in range(0, self.poolcount):
  10. self.tcpsocklist.append(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
  11. self.tcpsocklist[i].connect(self.server_address)
  12. return self
  13.  
  14. def send_data(self, data):
  15. msg = ujson.dumps(data) + "\n"
  16. itertools.cycle(self.tcpsocklist).__next__().sendall(msg.encode())
  17.  
  18. def __exit__(self, exc_type, exc_val, exc_tb):
  19. for sock in self.tcpsocklist:
  20. sock.close()
  21.  
  22. @staticmethod
  23. def upload_queue_to_remote_logstash(queue: Queue, ip: str, port: int, report_type: str, send_interval=0.01):
  24. with LogstashSender(ip, port) as lssender:
  25. while True:
  26. if not queue.empty():
  27. report = queue.get_nowait()
  28. # print(
  29. logger.info('Send to ES3 - {}'.format(report))
  30. logger.info('Current queue size - {}'.format(queue.qsize()))
  31. try:
  32. if 'timestamp' not in report:
  33. report['timestamp'] = utcnow_iso_timestamp()
  34. report['queue_size'] = queue.qsize()
  35. report['type'] = report_type
  36. lssender.send_data(report)
  37. except Exception as e:
  38. print(str(e))
  39. raise e
  40. queue.put_nowait(report)
  41. else:
  42. time.sleep(send_interval)
Add Comment
Please, Sign In to add comment