Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- Created on Sun Apr 5 19:06:56 2015
- @author: madengr
- """
- from gnuradio import gr
- from gnuradio import uhd
- from gnuradio import filter # Warning: redefining built-in filter()
- from gnuradio import blocks
- import time
- import numpy as np
- def str_to_floats(data):
- """Convert string of floats to list of floats
- Used for converting GNU Radio message queue strings
- """
- import struct
- floats = []
- for i in range(0,len(data),4):
- floats.append(struct.unpack_from('f',data[i:i+4])[0])
- return np.array(floats)
- class ScannerTopBlock(gr.top_block):
- """ Class for the GNU Radio GSM channel scanner flow
- """
- def __init__(self):
- # Call the initialization method from the parent class
- gr.top_block.__init__(self, "GSM Channel Scanner")
- # Define the constants
- uhd_args = "type=b200, master_clock_rate=50E6"
- src_samp_rate = 1E6
- rx_freq = 1830.2E6
- src_gain_dB = 20
- rssi_cal_offset_dB = -24.2
- # Initialize a queue
- self.sink_queue = gr.msg_queue()
- # Setup the USRP source
- self.src = uhd.usrp_source(uhd_args, uhd.io_type_t.COMPLEX_FLOAT32, 1)
- self.src.set_samp_rate(src_samp_rate)
- self.src.set_gain(src_gain_dB, 0)
- self.src.set_center_freq(rx_freq, 0)
- # Generate taps for frequency translating FIR filter
- filter_taps = filter.firdes.low_pass(gain=1.0,
- sampling_freq=src_samp_rate,
- cutoff_freq=100E3,
- transition_width=25E3,
- window=filter.firdes.WIN_HAMMING)
- # Channel filter
- channel_filter = filter.fir_filter_ccc(1, filter_taps)
- # Calculate power for RSSI
- c2magsqr = blocks.complex_to_mag_squared(1)
- # Integrate for mean power and decimate down to 100 Hz
- integrator = blocks.integrate_ff(10000)
- # Take 10*Log10() and offset for calibrated power and src gain
- logten = blocks.nlog10_ff(10, 1, rssi_cal_offset_dB-src_gain_dB)
- # Message sink
- self.msg_sink = blocks.message_sink(gr.sizeof_float, self.sink_queue, False)
- # Connect the blocks for the RSSI
- self.connect(self.src, channel_filter, c2magsqr, integrator, logten, self.msg_sink)
- def main():
- """ Start the flow """
- # Set a threshold in dBm to log the channel
- threshold_dBm = -75
- # Lets offset tune to avoid DC spike
- lo_offset_freq = 1.1E6
- # Open a file for logging
- filename = 'log.txt'
- f = open(filename, 'w')
- # Start the flow
- tb = ScannerTopBlock()
- tb.start()
- # Get the time so we know how long it takes to scan
- print "Starting GSM channel scan"
- t0 = time.time()
- # Scan thru ARFCN
- for arfcn in range(512, 811, 1):
- channel_freq = 1850.2E6 + 80E6 + 0.2E6*(arfcn-512)
- if not tb.src.set_center_freq(uhd.tune_request(channel_freq, lo_offset_freq)):
- print '[ERROR] Failed to set base frequency.'
- raise SystemExit, 1
- # Flush the queue
- tb.sink_queue.flush()
- # Wait for queue to fill and flush again to clean integrator
- while tb.sink_queue.count() == 0:
- pass
- tb.sink_queue.flush()
- # Wait for queue to fill then pull a sample
- while tb.sink_queue.count() == 0:
- pass
- rssi_str = tb.sink_queue.delete_head_nowait().to_string()
- # Convert to float value and log
- rssi = str_to_floats(rssi_str)[0]
- # Log if above threshold and not a spur (10 MHz clock harmonic)
- if rssi >= threshold_dBm and channel_freq % 10E6 != 0:
- log_string = str(arfcn) + " " + str(channel_freq/1E6) + " " + str(rssi)
- f.write(log_string + '\n')
- print(log_string)
- #Print the elapsed time it took to scan
- print str(time.time() - t0) + " seconds to scan"
- # Stop the flow
- tb.stop()
- tb.wait()
- f.close()
- if __name__ == '__main__':
- try:
- main()
- except KeyboardInterrupt:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement