Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from gnuradio import gr
- import pmt
- import array
- import base64
- class blk(gr.sync_block):
- def __init__(self):
- gr.sync_block.__init__(
- self,
- name='EPB: Decode Packet', # will show up in GRC
- in_sig=None,
- out_sig=None)
- self.message_port_register_in(pmt.intern('msg_in'))
- self.message_port_register_out(pmt.intern('msg_out'))
- self.set_msg_handler(pmt.intern('msg_in'), self.handle_msg)
- def handle_msg(self, msg):
- _debug = 1 # set to zero to turn off diagnostics
- try:
- buff = pmt.to_python(pmt.cdr(msg))
- except Exception as e:
- gr.log.error("Error with message conversion: %s" % str(e))
- return
- b_len = len (buff)
- if (b_len != 52):
- tmp_char_list = np.array([],dtype=int)
- tmp_char_list = np.append(tmp_char_list, buff)
- tmp_char_list_len = len(tmp_char_list)
- byte_array = array.array('B',tmp_char_list)
- # decode Base64
- data = base64.b64decode(byte_array)
- d_len = len (data)
- if (_debug):
- print ("data =", data, d_len)
- # create PMT u8vector using byte array
- new_pmt = pmt.cons (pmt.PMT_NIL, pmt.init_u8vector(d_len,bytearray(data,'UTF-8')))
- self.message_port_pub(pmt.intern('msg_out'), new_pmt)
- elif ((buff[0] != 37) and (buff[51] != 93)):
- tmp_char_list = np.array([],dtype=int)
- tmp_char_list = np.append(tmp_char_list, buff)
- tmp_char_list_len = len(tmp_char_list)
- byte_array = array.array('B',tmp_char_list)
- # decode Base64
- data = base64.b64decode(byte_array)
- d_len = len (data)
- if (_debug):
- print ("data =", data, d_len)
- # create PMT u8vector using byte array
- new_pmt = pmt.cons (pmt.PMT_NIL, pmt.init_u8vector(d_len,bytearray(data,'UTF-8')))
- self.message_port_pub(pmt.intern('msg_out'), new_pmt)
Add Comment
Please, Sign In to add comment