duggabe

test_strip_epy_block_0_0.py

Sep 10th, 2023
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.10 KB | None | 0 0
  1. import numpy as np
  2. from gnuradio import gr
  3. import pmt
  4. import array
  5. import base64
  6.  
  7. class blk(gr.sync_block):
  8. def __init__(self):
  9. gr.sync_block.__init__(
  10. self,
  11. name='EPB: Decode Packet', # will show up in GRC
  12. in_sig=None,
  13. out_sig=None)
  14. self.message_port_register_in(pmt.intern('msg_in'))
  15. self.message_port_register_out(pmt.intern('msg_out'))
  16. self.set_msg_handler(pmt.intern('msg_in'), self.handle_msg)
  17.  
  18. def handle_msg(self, msg):
  19. _debug = 1 # set to zero to turn off diagnostics
  20. try:
  21. buff = pmt.to_python(pmt.cdr(msg))
  22. except Exception as e:
  23. gr.log.error("Error with message conversion: %s" % str(e))
  24. return
  25.  
  26. b_len = len (buff)
  27.  
  28. if (b_len != 52):
  29. tmp_char_list = np.array([],dtype=int)
  30. tmp_char_list = np.append(tmp_char_list, buff)
  31. tmp_char_list_len = len(tmp_char_list)
  32. byte_array = array.array('B',tmp_char_list)
  33.  
  34. # decode Base64
  35. data = base64.b64decode(byte_array)
  36. d_len = len (data)
  37. if (_debug):
  38. print ("data =", data, d_len)
  39.  
  40. # create PMT u8vector using byte array
  41. new_pmt = pmt.cons (pmt.PMT_NIL, pmt.init_u8vector(d_len,bytearray(data,'UTF-8')))
  42.  
  43. self.message_port_pub(pmt.intern('msg_out'), new_pmt)
  44.  
  45. elif ((buff[0] != 37) and (buff[51] != 93)):
  46. tmp_char_list = np.array([],dtype=int)
  47. tmp_char_list = np.append(tmp_char_list, buff)
  48. tmp_char_list_len = len(tmp_char_list)
  49. byte_array = array.array('B',tmp_char_list)
  50.  
  51. # decode Base64
  52. data = base64.b64decode(byte_array)
  53. d_len = len (data)
  54. if (_debug):
  55. print ("data =", data, d_len)
  56.  
  57. # create PMT u8vector using byte array
  58. new_pmt = pmt.cons (pmt.PMT_NIL, pmt.init_u8vector(d_len,bytearray(data,'UTF-8')))
  59.  
  60. self.message_port_pub(pmt.intern('msg_out'), new_pmt)
  61.  
Add Comment
Please, Sign In to add comment