Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- options:
- parameters:
- author: Barry Duggan
- catch_exceptions: 'True'
- category: '[GRC Hier Blocks]'
- cmake_opt: ''
- comment: ''
- copyright: ''
- description: test base64 conversion
- gen_cmake: 'On'
- gen_linking: dynamic
- generate_options: qt_gui
- hier_block_src_path: '.:'
- id: test_strip
- max_nouts: '0'
- output_language: python
- placement: (0,0)
- qt_qss_theme: ''
- realtime_scheduling: ''
- run: 'True'
- run_command: '{python} -u {filename}'
- run_options: prompt
- sizing_mode: fixed
- thread_safe_setters: ''
- title: test_strip
- window_size: (1000,1000)
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [16, 12.0]
- rotation: 0
- state: enabled
- blocks:
- - name: samp_rate
- id: variable
- parameters:
- comment: ''
- value: '48000'
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [200, 12.0]
- rotation: 0
- state: enabled
- - name: blocks_file_sink_0
- id: blocks_file_sink
- parameters:
- affinity: ''
- alias: ''
- append: 'False'
- comment: ''
- file: ./output.tmp
- type: byte
- unbuffered: 'True'
- vlen: '1'
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [1248, 180.0]
- rotation: 0
- state: enabled
- - name: blocks_throttle2_0
- id: blocks_throttle2
- parameters:
- affinity: ''
- alias: ''
- comment: ''
- ignoretag: 'True'
- limit: auto
- maximum: '0.1'
- maxoutbuf: '0'
- minoutbuf: '0'
- samples_per_second: samp_rate
- type: byte
- vlen: '1'
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [1072, 188.0]
- rotation: 0
- state: true
- - name: epy_block_0
- id: epy_block
- parameters:
- FileName: '"./gr-logo.png"'
- Pkt_len: '60'
- _source_code: "\"\"\"\nEmbedded Python Block: File Source to Tagged Stream\n\"\
- \"\"\n\nimport numpy as np\nfrom gnuradio import gr\nimport time\nimport pmt\n\
- import os.path\nimport sys\nimport base64\n\n\"\"\"\nState definitions\n \
- \ 0 idle\n 1 send preamble\n 2 send file data\n 3 send file\
- \ name\n 4 send post filler\n\"\"\"\n\nclass blk(gr.sync_block):\n def\
- \ __init__(self, FileName='None', Pkt_len=52):\n gr.sync_block.__init__(\n\
- \ self,\n name='EPB: File Source to Tagged Stream',\n\
- \ in_sig=None,\n out_sig=[np.uint8])\n self.FileName\
- \ = FileName\n self.Pkt_len = Pkt_len\n self.state = 0 #\
- \ idle state\n self.pre_count = 0\n self.indx = 0\n self._debug\
- \ = 0 # debug\n self.data = \"\"\n\n if (os.path.exists(self.FileName)):\n\
- \ # open input file\n self.f_in = open (self.FileName,\
- \ 'rb')\n self._eof = False\n if (self._debug):\n \
- \ print (\"File name:\", self.FileName)\n self.state =\
- \ 1\n else:\n print(self.FileName, 'does not exist')\n \
- \ self._eof = True\n self.state = 0\n\n self.char_list\
- \ = [37,85,85,85,85,85,85,85,85,85,85,85,85,85,85,85, 85,85,85,85,85,85,85,85,85,85,85,85,85,85,85,85,\
- \ 85,85,85,85,85,85,85,85,85,85,85,85,85,85,85,85, 85,85,85,93]\n self.c_len\
- \ = len (self.char_list)\n # print (self.c_len)\n self.filler\
- \ = [37,85,85,85, 35,69,79,70, 85,85,85,85,85,85,85,85, 85,85,85,85,85,85,85,85,85,85,85,85,85,85,85,85,\
- \ 85,85,85,85,85,85,85,85,85,85,85,85,85,85,85,85, 85,85,85,93]\n self.f_len\
- \ = len (self.filler)\n\n def work(self, input_items, output_items):\n\n\
- \ if (self.state == 0):\n # idle\n return (0)\n\
- \n elif (self.state == 1):\n # send preamble\n \
- \ if (self._debug):\n print (\"state = 1\", self.pre_count)\n\
- \ key1 = pmt.intern(\"packet_len\")\n val1 = pmt.from_long(self.c_len)\n\
- \ self.add_item_tag(0, # Write to output port 0\n \
- \ self.indx, # Index of the tag\n key1, # Key of the tag\n\
- \ val1 # Value of the tag\n )\n \
- \ self.indx += self.c_len\n i = 0\n while (i < self.c_len):\n\
- \ output_items[0][i] = self.char_list[i]\n i +=\
- \ 1\n self.pre_count += 1\n if (self.pre_count > 64):\n\
- \ self.pre_count = 0\n self.state = 2 # send\
- \ msg\n return (self.c_len)\n\n elif (self.state == 2):\n\
- \ while (not (self._eof)):\n buff = self.f_in.read\
- \ (self.Pkt_len)\n b_len = len(buff)\n if b_len\
- \ == 0:\n print ('End of file')\n self._eof\
- \ = True\n self.f_in.close()\n self.state\
- \ = 3 # send file name\n self.pre_count = 0\n \
- \ break\n # convert to Base64\n encoded\
- \ = base64.b64encode (buff)\n e_len = len(encoded)\n \
- \ if (self._debug):\n print ('b64 length =', e_len)\n\
- \ key0 = pmt.intern(\"packet_len\")\n val0 = pmt.from_long(e_len)\n\
- \ self.add_item_tag(0, # Write to output port 0\n \
- \ self.indx, # Index of the tag\n key0, # Key\
- \ of the tag\n val0 # Value of the tag\n \
- \ )\n self.indx += e_len\n i = 0\n \
- \ while (i < e_len):\n output_items[0][i] = encoded[i]\n\
- \ i += 1\n return (e_len)\n\n elif\
- \ (self.state == 3):\n # send file name\n fn_len = len\
- \ (self.FileName)\n key1 = pmt.intern(\"packet_len\")\n \
- \ val1 = pmt.from_long(fn_len+8)\n self.add_item_tag(0, # Write\
- \ to output port 0\n self.indx, # Index of the tag\n \
- \ key1, # Key of the tag\n val1 # Value of the\
- \ tag\n )\n self.indx += (fn_len+8)\n i\
- \ = 0\n while (i < 8):\n output_items[0][i] = self.filler[i]\n\
- \ i += 1\n j = 0\n while (i < (fn_len+8)):\n\
- \ output_items[0][i] = ord(self.FileName[j])\n \
- \ i += 1\n j += 1\n self.state = 4\n return\
- \ (fn_len+8)\n\n elif (self.state == 4):\n # send post filler\n\
- \ if (self._debug):\n print (\"state = 4\", self.pre_count)\n\
- \ key1 = pmt.intern(\"packet_len\")\n val1 = pmt.from_long(self.f_len)\n\
- \ self.add_item_tag(0, # Write to output port 0\n \
- \ self.indx, # Index of the tag\n key1, # Key of the tag\n\
- \ val1 # Value of the tag\n )\n \
- \ self.indx += self.f_len\n i = 0\n while (i < self.f_len):\n\
- \ output_items[0][i] = self.filler[i]\n i += 1\n\
- \ self.pre_count += 1\n if (self.pre_count > 16):\n \
- \ self.pre_count = 0\n self.state = 0 # idle\n\
- \ return (self.f_len)\n\n return (0)\n\n"
- affinity: ''
- alias: ''
- comment: ''
- maxoutbuf: '0'
- minoutbuf: '0'
- states:
- _io_cache: '(''EPB: File Source to Tagged Stream'', ''blk'', [(''FileName'', "''None''"),
- (''Pkt_len'', ''52'')], [], [(''0'', ''byte'', 1)], '''', [''FileName'', ''Pkt_len''])'
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [16, 188.0]
- rotation: 0
- state: true
- - name: epy_block_0_0
- id: epy_block
- parameters:
- _source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\nimport\
- \ array\nimport base64\n\nclass blk(gr.sync_block):\n def __init__(self):\n\
- \ gr.sync_block.__init__(\n self,\n name='EPB:\
- \ Decode Packet', # will show up in GRC\n in_sig=None,\n \
- \ out_sig=None)\n self.message_port_register_in(pmt.intern('msg_in'))\n\
- \ self.message_port_register_out(pmt.intern('msg_out'))\n self.set_msg_handler(pmt.intern('msg_in'),\
- \ self.handle_msg)\n\n def handle_msg(self, msg):\n _debug = 1 \
- \ # set to zero to turn off diagnostics\n try:\n buff\
- \ = pmt.to_python(pmt.cdr(msg))\n except Exception as e:\n \
- \ gr.log.error(\"Error with message conversion: %s\" % str(e))\n \
- \ return\n\n b_len = len (buff)\n\n if (b_len != 52):\n \
- \ tmp_char_list = np.array([],dtype=int)\n tmp_char_list =\
- \ np.append(tmp_char_list, buff)\n tmp_char_list_len = len(tmp_char_list)\n\
- \ byte_array = array.array('B',tmp_char_list)\n\n # decode\
- \ Base64\n data = base64.b64decode(byte_array)\n d_len\
- \ = len (data)\n if (_debug):\n print (\"data =\"\
- , data, d_len)\n\n # create PMT u8vector using byte array\n \
- \ new_pmt = pmt.cons (pmt.PMT_NIL, pmt.init_u8vector(d_len,bytearray(data,'UTF-8')))\n\
- \n self.message_port_pub(pmt.intern('msg_out'), new_pmt)\n\n \
- \ elif ((buff[0] != 37) and (buff[51] != 93)):\n tmp_char_list\
- \ = np.array([],dtype=int)\n tmp_char_list = np.append(tmp_char_list,\
- \ buff)\n tmp_char_list_len = len(tmp_char_list)\n byte_array\
- \ = array.array('B',tmp_char_list)\n\n # decode Base64\n \
- \ data = base64.b64decode(byte_array)\n d_len = len (data)\n \
- \ if (_debug):\n print (\"data =\", data, d_len)\n\n\
- \ # create PMT u8vector using byte array\n new_pmt = pmt.cons\
- \ (pmt.PMT_NIL, pmt.init_u8vector(d_len,bytearray(data,'UTF-8')))\n\n \
- \ self.message_port_pub(pmt.intern('msg_out'), new_pmt)\n\n"
- affinity: ''
- alias: ''
- comment: ''
- maxoutbuf: '0'
- minoutbuf: '0'
- states:
- _io_cache: '(''EPB: Decode Packet'', ''blk'', [], [(''msg_in'', ''message'', 1)],
- [(''msg_out'', ''message'', 1)], '''', [])'
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [584, 200.0]
- rotation: 0
- state: true
- - name: pdu_pdu_to_tagged_stream_0
- id: pdu_pdu_to_tagged_stream
- parameters:
- affinity: ''
- alias: ''
- comment: ''
- maxoutbuf: '0'
- minoutbuf: '0'
- tag: packet_len
- type: byte
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [840, 196.0]
- rotation: 0
- state: true
- - name: pdu_tagged_stream_to_pdu_0
- id: pdu_tagged_stream_to_pdu
- parameters:
- affinity: ''
- alias: ''
- comment: ''
- maxoutbuf: '0'
- minoutbuf: '0'
- tag: packet_len
- type: byte
- states:
- bus_sink: false
- bus_source: false
- bus_structure: null
- coordinate: [304, 196.0]
- rotation: 0
- state: true
- connections:
- - [blocks_throttle2_0, '0', blocks_file_sink_0, '0']
- - [epy_block_0, '0', pdu_tagged_stream_to_pdu_0, '0']
- - [epy_block_0_0, msg_out, pdu_pdu_to_tagged_stream_0, pdus]
- - [pdu_pdu_to_tagged_stream_0, '0', blocks_throttle2_0, '0']
- - [pdu_tagged_stream_to_pdu_0, pdus, epy_block_0_0, msg_in]
- metadata:
- file_format: 1
- grc_version: 3.10.6.0
Add Comment
Please, Sign In to add comment