Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import threading
- import SocketServer
- from azure.eventhub import EventHubClient, EventData
- ADDRESS = ""
- USER = ""
- KEY = ""
- class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
- def handle(self):
- data = self.request[0].strip()
- current_thread = threading.current_thread()
- print("Thread: {} client: {}, wrote: {}".format(current_thread.name, self.client_address, data))
- Split = threading.Thread(target=ParseIncomingData,args=(data,))
- Split.start()
- class ThreadedUDPServer(SocketServer.ThreadingMixIn, SocketServer.UDPServer):
- pass
- def SendToEventHub(data):
- if not ADDRESS:
- raise ValueError("No EventHubs URL supplied.")
- client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
- sender = client.add_sender(partition="0")
- client.run()
- try:
- print("Thread: {}, Sending message: {}".format(threading.current_thread().name,data))
- sender.send(EventData(data))
- except:
- raise
- finally:
- client.stop()
- def ParseIncomingData(message):
- sender = threading.Thread(target=SendToEventHub, args=(json.dumps(message),))
- sender.start()
- if __name__ == "__main__":
- HOST, PORT = "0.0.0.0", 6071
- try:
- serverUDP = ThreadedUDPServer((HOST, PORT), ThreadedUDPRequestHandler)
- server_thread_UDP = threading.Thread(target=serverUDP.serve_forever)
- server_thread_UDP.daemon = True
- server_thread_UDP.start()
- serverUDP.serve_forever()
- except (KeyboardInterrupt, SystemExit):
- serverUDP.shutdown()
- serverUDP.server_close()
- exit()
Add Comment
Please, Sign In to add comment