Advertisement
Guest User

Untitled

a guest
Jul 19th, 2019
196
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.64 KB | None | 0 0
  1. Exception in thread Thread-7:
  2. Traceback (most recent call last):
  3. File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 801, in __bootstrap_inner
  4. self.run()
  5. File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 754, in run
  6. self.__target(*self.__args, **self.__kwargs)
  7. File "d:kdarling_lobgryphon_eagleproj_0012_s2_lobcorrmainlobxsrccorrelatorhandlersinputhandler.py", line 175, in _read_socket
  8. parsed_submit_time = datetime.datetime.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
  9. AttributeError: 'module' object has no attribute '_strptime'
  10.  
  11. Exception in thread Thread-6:
  12. Traceback (most recent call last):
  13. File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 801, in __bootstrap_inner
  14. self.run()
  15. File "C:UserskdarlingAppDataLocalContinuumanaconda3envslobsandboxlibthreading.py", line 754, in run
  16. self.__target(*self.__args, **self.__kwargs)
  17. File "d:kdarling_lobgryphon_eagleproj_0012_s2_lobcorrmainlobxsrccorrelatorhandlersinputhandler.py", line 176, in _read_socket
  18. parsed_submit_time = time.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
  19. AttributeError: 'module' object has no attribute '_strptime_time'
  20.  
  21. import datetime
  22. import heapq
  23. import json
  24. import os
  25. import socket
  26. import sys
  27. import time
  28.  
  29. from io import BytesIO
  30. from threading import Thread
  31.  
  32. from handler import Handler
  33.  
  34.  
  35. class InputStreamHandler(Handler):
  36.  
  37. def __init__(self, configuration, input_message_heap):
  38. """
  39. Initialization function for InputStreamHandler.
  40. :param configuration: Configuration object that stores specific information.
  41. :type configuration: Configuration
  42. :param input_message_heap: Message heap that consumers thread will populate.
  43. :type input_message_heap: Heap
  44. """
  45. super(InputStreamHandler, self).__init__()
  46. self.release_size = configuration.get_release_size()
  47. self.input_src = configuration.get_input_source()
  48. self.input_message_heap = input_message_heap
  49. self.root_path = os.path.join(configuration.get_root_log_directory(), 'input', 'sensor_data')
  50. self.logging = configuration.get_logger()
  51. self.Status = configuration.Status
  52.  
  53. self.get_input_status_fn = configuration.get_input_functioning_status
  54. self.update_input_status = configuration.set_input_functioning_status
  55.  
  56. if configuration.get_input_state() == self.Status.ONLINE:
  57. self._input_stream = Thread(target=self._spinup_sockets)
  58. elif configuration.get_input_state() == self.Status.OFFLINE:
  59. self._input_stream = Thread(target=self._read_files)
  60.  
  61. def start(self):
  62. """
  63. Starts the input stream thread to begin consuming data from the sensors connected.
  64. :return: True if thread hasn't been started, else False on multiple start fail.
  65. """
  66. try:
  67. self.update_input_status(self.Status.ONLINE)
  68. self._input_stream.start()
  69. self.logging.info('Successfully started Input Handler.')
  70. except RuntimeError:
  71. return False
  72. return True
  73.  
  74. def status(self):
  75. """
  76. Displays the status of the thread, useful for offline reporting.
  77. """
  78. return self.get_input_status_fn()
  79.  
  80. def stop(self):
  81. """
  82. Stops the input stream thread by ending the looping process.
  83. """
  84. if self.get_input_status_fn() == self.Status.ONLINE:
  85. self.logging.info('Closing Input Handler execution thread.')
  86. self.update_input_status(self.Status.OFFLINE)
  87. self._input_stream.join()
  88.  
  89. def _read_files(self):
  90. pass
  91.  
  92. def _spinup_sockets(self):
  93. """
  94. Enacts sockets onto their own thread to collect messages.
  95. Ensures that blocking doesn't occur on the main thread.
  96. """
  97. active_threads = {}
  98. while self.get_input_status_fn() == self.Status.ONLINE:
  99. # Check if any are online
  100. if all([value['state'] == self.Status.OFFLINE for value in self.input_src.values()]):
  101. self.update_input_status(self.Status.OFFLINE)
  102. for active_thread in active_threads.values():
  103. active_thread.join()
  104. break
  105.  
  106. for key in self.input_src.keys():
  107. # Check if key exists, if not, spin up call
  108. if (key not in active_threads or not active_threads[key].isAlive()) and self.input_src[key]['state'] == self.Status.ONLINE:
  109. active_threads[key] = Thread(target=self._read_socket, args=(key, active_threads,))
  110. active_threads[key].start()
  111. print(self.input_src)
  112.  
  113. def _read_socket(self, key, cache):
  114. """
  115. Reads data from a socket, places message into the queue, and pop the key.
  116. :param key: Key corresponding to socket.
  117. :type key: UUID String
  118. :param cache: Key cache that corresponds the key and various others.
  119. :type cache: Dictionary
  120. """
  121. message = None
  122. try:
  123. sensor_socket = self.input_src[key]['sensor']
  124.  
  125. ...
  126.  
  127. message = json.loads(stream.getvalue().decode('utf-8'))
  128. if 'submit_time' in message['msg'].keys():
  129. # Inherited function
  130. self.write_to_log_file(self.root_path + key, message, self.release_size)
  131. message['key'] = key
  132. parsed_submit_time = time.strptime(message['msg']['submit_time'], '%Y-%m-%dT%H:%M:%S.%fZ')
  133. heapq.heappush(self.input_message_heap, (parsed_submit_time, message))
  134. cache.pop(key)
  135. except:
  136. pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement