Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy as np
- from gnuradio import gr
- import pmt
- import base64
- class blk(gr.sync_block):
- def __init__(self):
- gr.sync_block.__init__(
- self,
- name='EPB: Decode Packet', # will show up in GRC
- in_sig=None,
- out_sig=None)
- self.message_port_register_in(pmt.intern('msg_in'))
- self.message_port_register_out(pmt.intern('msg_out'))
- self.set_msg_handler(pmt.intern('msg_in'), self.handle_msg)
- def handle_msg(self, msg):
- _debug = 0 # set to zero to turn off diagnostics
- try:
- buff = pmt.to_python(pmt.cdr(msg))
- except Exception as e:
- gr.log.error("Error with message conversion: %s" % str(e))
- return
- b_len = len (buff)
- if (_debug):
- print ("new_val =", buff, b_len)
- u8_data = []
- if (b_len != 52):
- # decode Base64
- data = base64.b64decode(buff)
- i = 0
- while (i < len(data)):
- u8_data.append(pmt.to_pmt (data[i]))
- i += 1
- pdu = pmt.cons(pmt.PMT_NIL,pmt.init_u8vector(len(u8_data), u8_data))
- self.message_port_pub(pmt.intern('msg_out'), pdu)
- elif ((buff[0] != 37) and (buff[51] != 93)):
- # decode Base64
- data = base64.b64decode(buff)
- i = 0
- while (i < len(data)):
- u8_data.append(pmt.to_pmt (data[i]))
- i += 1
- pdu = pmt.cons(pmt.PMT_NIL,pmt.init_u8vector(len(u8_data), u8_data))
- self.message_port_pub(pmt.intern('msg_out'), pdu)
Add Comment
Please, Sign In to add comment