Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- from gnuradio import gr
- from gnuradio import blocks
- import gnuradio.gr.gr_threading as _threading
- class _mythread(_threading.Thread):
- def __init__(self, msgq_in, msgq_out):
- self._msgq_in = msgq_in
- self._msgq_out = msgq_out
- _threading.Thread.__init__(self)
- self.setDaemon(1)
- self.keep_running = True
- self.start()
- def run(self):
- while self.keep_running:
- # Read msg from input
- msg = self._msgq_in.delete_head()
- sample = msg.to_string()
- # do some stuffs on samples[]
- # ...
- # sending out
- msg = gr.message_from_string(sample)
- self._msgq_out.insert_tail(msg)
- class my_top_block(gr.top_block):
- def __init__(self):
- gr.top_block.__init__(self)
- # blocks
- self.blocks_file_source = blocks.file_source(gr.sizeof_char*1, "/tmp/source", True) # True -> repeat
- self.blocks_file_sink = blocks.file_sink(gr.sizeof_char*1, "/tmp/sink", False)
- self.hier_block = hier_block()
- # connections
- self.connect((self.blocks_file_source, 0), (self.hier_block, 0))
- self.connect((self.hier_block, 0), (self.blocks_file_sink, 0))
- class hier_block(gr.hier_block2):
- def __init__(self):
- gr.hier_block2.__init__(
- self,
- "hier_block",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_char) # Output signature
- )
- #create blocks
- msgq_in = gr.msg_queue(2)
- msgq_out = gr.msg_queue(2)
- msg_sink = blocks.message_sink(gr.sizeof_char, msgq_in, False)
- msg_source = blocks.message_source(gr.sizeof_char, msgq_out)
- #connect
- self.connect(self, msg_sink)
- self.connect(msg_source, self)
- #start thread
- _mythread(msgq_in, msgq_out)
- if __name__ == '__main__':
- try:
- my_top_block().run()
- except [[KeyboardInterrupt]]:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement