Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Exception in thread Thread-7:
- Traceback (most recent call last):
- File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 801, in __bootstrap_inner
- self.run()
- File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 754, in run
- self.__target(*self.__args, **self.__kwargs)
- File "d:kdarling_lobgryphon_eagleproj_0012_s2_lobcorrmainlobxsrccorrelatorhandlersinputhandler.py", line 175, in _read_socket
- parsed_submit_time = datetime.datetime.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
- AttributeError: 'module' object has no attribute '_strptime'
- Exception in thread Thread-6:
- Traceback (most recent call last):
- File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 801, in __bootstrap_inner
- self.run()
- File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 754, in run
- self.__target(*self.__args, **self.__kwargs)
- File "d:kdarling_lobgryphon_eagleproj_0012_s2_lobcorrmainlobxsrccorrelatorhandlersinputhandler.py", line 176, in _read_socket
- parsed_submit_time = time.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
- AttributeError: 'module' object has no attribute '_strptime_time'
- import datetime
- import heapq
- import json
- import os
- import socket
- import sys
- import time
- from io import BytesIO
- from threading import Thread
- from handler import Handler
- class InputStreamHandler(Handler):
- def __init__(self, configuration, input_message_heap):
- """
- Initialization function for InputStreamHandler.
- :param configuration: Configuration object that stores specific information.
- :type configuration: Configuration
- :param input_message_heap: Message heap that consumers thread will populate.
- :type input_message_heap: Heap
- """
- super(InputStreamHandler, self).__init__()
- self.release_size = configuration.get_release_size()
- self.input_src = configuration.get_input_source()
- self.input_message_heap = input_message_heap
- self.root_path = os.path.join(configuration.get_root_log_directory(), 'input', 'sensor_data')
- self.logging = configuration.get_logger()
- self.Status = configuration.Status
- self.get_input_status_fn = configuration.get_input_functioning_status
- self.update_input_status = configuration.set_input_functioning_status
- if configuration.get_input_state() == self.Status.ONLINE:
- self._input_stream = Thread(target=self._spinup_sockets)
- elif configuration.get_input_state() == self.Status.OFFLINE:
- self._input_stream = Thread(target=self._read_files)
- def start(self):
- """
- Starts the input stream thread to begin consuming data from the sensors connected.
- :return: True if thread hasn't been started, else False on multiple start fail.
- """
- try:
- self.update_input_status(self.Status.ONLINE)
- self._input_stream.start()
- self.logging.info('Successfully started Input Handler.')
- except RuntimeError:
- return False
- return True
- def status(self):
- """
- Displays the status of the thread, useful for offline reporting.
- """
- return self.get_input_status_fn()
- def stop(self):
- """
- Stops the input stream thread by ending the looping process.
- """
- if self.get_input_status_fn() == self.Status.ONLINE:
- self.logging.info('Closing Input Handler execution thread.')
- self.update_input_status(self.Status.OFFLINE)
- self._input_stream.join()
- def _read_files(self):
- pass
- def _spinup_sockets(self):
- """
- Enacts sockets onto their own thread to collect messages.
- Ensures that blocking doesn't occur on the main thread.
- """
- active_threads = {}
- while self.get_input_status_fn() == self.Status.ONLINE:
- # Check if any are online
- if all([value['state'] == self.Status.OFFLINE for value in self.input_src.values()]):
- self.update_input_status(self.Status.OFFLINE)
- for active_thread in active_threads.values():
- active_thread.join()
- break
- for key in self.input_src.keys():
- # Check if key exists, if not, spin up call
- if (key not in active_threads or not active_threads[key].isAlive()) and self.input_src[key]['state'] == self.Status.ONLINE:
- active_threads[key] = Thread(target=self._read_socket, args=(key, active_threads,))
- active_threads[key].start()
- print(self.input_src)
- def _read_socket(self, key, cache):
- """
- Reads data from a socket, places message into the queue, and pop the key.
- :param key: Key corresponding to socket.
- :type key: UUID String
- :param cache: Key cache that corresponds the key and various others.
- :type cache: Dictionary
- """
- message = None
- try:
- sensor_socket = self.input_src[key]['sensor']
- ...
- message = json.loads(stream.getvalue().decode('utf-8'))
- if 'submit_time' in message['msg'].keys():
- # Inherited function
- self.write_to_log_file(self.root_path + key, message, self.release_size)
- message['key'] = key
- parsed_submit_time = time.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
- heapq.heappush(self.input_message_heap, (parsed_submit_time, message))
- cache.pop(key)
- except:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement