Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import numpy
- import pmt
- from gnuradio import gr
- class AsyncMsgSourceC(gr.sync_block):
- """
- Transmits zeros when no messages in queue
- and their payload (complex samples) when present.
- """
- def __init__(self):
- gr.sync_block.__init__(self,
- name="async_msg_source_c",
- in_sig=None,
- out_sig=[numpy.complex64])
- self.ready_samples = numpy.empty((0,))
- self.message_port_register_in(pmt.intern('pdus'))
- self.set_msg_handler(pmt.intern('pdus'), self.msg_handler)
- def msg_handler(self, msg):
- pdu = pmt.cdr(msg)
- new_samples = pmt.to_python(pdu)
- self.ready_samples = numpy.concatenate((
- self.ready_samples,
- new_samples
- ))
- def work(self, input_items, output_items):
- out = output_items[0]
- n_demanded = len(out)
- n_ready = len(self.ready_samples)
- n_zeros = max(0, n_demanded - n_ready)
- zeros = numpy.zeros(n_zeros)
- n_consumed = max(0, min(n_ready, n_demanded))
- samples = self.ready_samples[0:n_consumed]
- out[0:n_consumed] = samples
- out[n_consumed:] = zeros
- self.ready_samples = self.ready_samples[n_consumed:]
- # TODO: add sob/eob tags
- return n_demanded
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement