Advertisement
Guest User

Untitled

a guest
May 9th, 2025
19
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.45 KB | None | 0 0
  1. from python_hackrf import pyhackrf
  2. import numpy as np
  3. import matplotlib.pyplot as plt
  4. from scipy import signal
  5. import sounddevice as sd
  6.  
  7. class HackRFAudioProcessor:
  8. def __init__(self, center_freq=100e6, sample_rate=10e6, baseband_filter=7.5e6,
  9. lna_gain=30, vga_gain=50, buffer_size=131072):
  10. """
  11. Initialize the HackRF and setup parameters for audio reception
  12.  
  13. Parameters:
  14. - center_freq: Center frequency in Hz (default: 100e6 for FM radio)
  15. - sample_rate: Sample rate in samples/sec (default: 10e6)
  16. - baseband_filter: Baseband filter bandwidth in Hz
  17. - lna_gain: LNA (RF) gain in dB (0-40 in 8 dB steps)
  18. - vga_gain: VGA (baseband) gain in dB (0-62 in 2 dB steps)
  19. - buffer_size: Size of buffer for samples
  20. """
  21. self.center_freq = center_freq
  22. self.sample_rate = sample_rate
  23. self.baseband_filter = baseband_filter
  24. self.lna_gain = lna_gain
  25. self.vga_gain = vga_gain
  26. self.buffer_size = buffer_size
  27.  
  28. # Initialize device
  29. pyhackrf.pyhackrf_init()
  30. self.sdr = pyhackrf.pyhackrf_open()
  31.  
  32. # Configure device
  33. self.allowed_filter = pyhackrf.pyhackrf_compute_baseband_filter_bw_round_down_lt(baseband_filter)
  34. self.sdr.pyhackrf_set_sample_rate(sample_rate)
  35. self.sdr.pyhackrf_set_baseband_filter_bandwidth(self.allowed_filter)
  36. self.sdr.pyhackrf_set_antenna_enable(False)
  37. self.sdr.pyhackrf_set_freq(center_freq)
  38. self.sdr.pyhackrf_set_amp_enable(False)
  39. self.sdr.pyhackrf_set_lna_gain(lna_gain)
  40. self.sdr.pyhackrf_set_vga_gain(vga_gain)
  41.  
  42. # Prepare buffer for samples
  43. self.samples = np.zeros(self.buffer_size, dtype=np.complex64)
  44. self.last_idx = 0
  45.  
  46. print(f"HackRF initialized with:")
  47. print(f"- Center frequency: {center_freq/1e6} MHz")
  48. print(f"- Sample rate: {sample_rate/1e6} MHz")
  49. print(f"- Baseband filter: {self.allowed_filter/1e6} MHz")
  50.  
  51. def rx_callback(self, device, buffer, buffer_length, valid_length):
  52. """
  53. Callback function to process received samples
  54. """
  55. global last_idx, samples
  56.  
  57. # Process received bytes
  58. accepted = valid_length // 2
  59. accepted_samples = buffer[:valid_length].astype(np.int8)
  60.  
  61. # Convert to complex format (de-interleave I/Q)
  62. accepted_samples = accepted_samples[0::2] + 1j * accepted_samples[1::2]
  63.  
  64. # Scale samples to -1 to +1 range
  65. accepted_samples = accepted_samples / 128.0
  66.  
  67. # Store in buffer
  68. if self.last_idx + accepted <= self.buffer_size:
  69. self.samples[self.last_idx:self.last_idx + accepted] = accepted_samples
  70. self.last_idx += accepted
  71.  
  72. return 0
  73.  
  74. def capture_samples(self, duration=1.0):
  75. """
  76. Capture IQ samples for specified duration
  77.  
  78. Parameters:
  79. - duration: Recording time in seconds
  80.  
  81. Returns:
  82. - Complex IQ samples
  83. """
  84. self.last_idx = 0
  85. self.samples = np.zeros(int(duration * self.sample_rate), dtype=np.complex64)
  86.  
  87. # Set callback function
  88. self.sdr.set_rx_callback(self.rx_callback)
  89.  
  90. # Start receiving
  91. self.sdr.pyhackrf_start_rx()
  92. print(f"Started capturing samples for {duration} seconds...")
  93.  
  94. # Wait for specified duration
  95. import time
  96. time.sleep(duration)
  97.  
  98. # Stop receiving
  99. self.sdr.pyhackrf_stop_rx()
  100. print(f"Captured {self.last_idx} samples")
  101.  
  102. # Skip first samples to avoid transients
  103. skip = int(0.1 * self.sample_rate)
  104. return self.samples[skip:self.last_idx] if self.last_idx > skip else self.samples[:self.last_idx]
  105.  
  106. def demodulate_fm(self, samples, deviation=75e3):
  107. """
  108. Demodulate FM signal from complex IQ samples
  109.  
  110. Parameters:
  111. - samples: Complex IQ samples
  112. - deviation: FM deviation in Hz (75 kHz for broadcast FM)
  113.  
  114. Returns:
  115. - Demodulated audio samples
  116. """
  117. # FM demodulation (phase difference)
  118. samples_diff = np.diff(np.unwrap(np.angle(samples)))
  119.  
  120. # Scale by deviation
  121. audio = samples_diff * (self.sample_rate / (2 * np.pi * deviation))
  122.  
  123. return audio
  124.  
  125. def process_audio(self, samples, target_rate=48000):
  126. """
  127. Process demodulated audio: filter and downsample
  128.  
  129. Parameters:
  130. - samples: Demodulated audio samples
  131. - target_rate: Target audio sample rate
  132.  
  133. Returns:
  134. - Processed audio samples at target rate
  135. """
  136. # Low-pass filter to audio frequencies (15 kHz for FM broadcast)
  137. nyq_rate = self.sample_rate / 2
  138. cutoff = 15000 / nyq_rate
  139.  
  140. # Design filter
  141. b, a = signal.butter(5, cutoff, 'low')
  142.  
  143. # Apply filter
  144. audio_filtered = signal.filtfilt(b, a, samples)
  145.  
  146. # Resample to target rate
  147. resampling_factor = target_rate / self.sample_rate
  148. num_output_samples = int(len(audio_filtered) * resampling_factor)
  149. audio_resampled = signal.resample(audio_filtered, num_output_samples)
  150.  
  151. return audio_resampled
  152.  
  153. def play_audio(self, audio, sample_rate=48000, duration=None):
  154. """
  155. Play audio samples through sound device
  156.  
  157. Parameters:
  158. - audio: Audio samples to play
  159. - sample_rate: Sample rate of audio
  160. - duration: Duration to play in seconds (None for all samples)
  161. """
  162. # Normalize audio
  163. audio = audio / np.max(np.abs(audio))
  164.  
  165. # Limit duration if specified
  166. if duration is not None:
  167. max_samples = int(duration * sample_rate)
  168. audio = audio[:max_samples]
  169.  
  170. # Play audio
  171. sd.play(audio, sample_rate)
  172. sd.wait()
  173.  
  174. def analyze_signal(self, samples, fft_size=2048):
  175. """
  176. Analyze signal: generate spectrogram and spectrum
  177.  
  178. Parameters:
  179. - samples: Complex IQ samples
  180. - fft_size: FFT size for spectrum analysis
  181. """
  182. # Create spectrogram
  183. plt.figure(figsize=(12, 10))
  184.  
  185. # Spectrum plot
  186. plt.subplot(3, 1, 1)
  187. spectrum = 10 * np.log10(np.abs(np.fft.fftshift(np.fft.fft(samples[:fft_size]))) ** 2)
  188. freq = np.fft.fftshift(np.fft.fftfreq(fft_size, 1/self.sample_rate))
  189. freq_mhz = freq / 1e6 + self.center_freq / 1e6
  190. plt.plot(freq_mhz, spectrum)
  191. plt.grid(True)
  192. plt.xlabel('Frequency (MHz)')
  193. plt.ylabel('Power (dB)')
  194. plt.title('Signal Spectrum')
  195.  
  196. # Spectrogram (waterfall)
  197. plt.subplot(3, 1, 2)
  198. num_rows = len(samples) // fft_size
  199. spectrogram = np.zeros((num_rows, fft_size))
  200. for i in range(num_rows):
  201. spectrogram[i, :] = 10 * np.log10(np.abs(np.fft.fftshift(
  202. np.fft.fft(samples[i * fft_size:(i+1) * fft_size]))) ** 2)
  203.  
  204. extent = [(self.center_freq + self.sample_rate / -2) / 1e6,
  205. (self.center_freq + self.sample_rate / 2) / 1e6,
  206. len(samples) / self.sample_rate, 0]
  207.  
  208. plt.imshow(spectrogram, aspect='auto', extent=extent)
  209. plt.xlabel('Frequency (MHz)')
  210. plt.ylabel('Time (s)')
  211. plt.title('Signal Spectrogram')
  212.  
  213. # Time domain plot
  214. plt.subplot(3, 1, 3)
  215. time = np.arange(10000) / self.sample_rate
  216. plt.plot(time, np.real(samples[:10000]), label='I')
  217. plt.plot(time, np.imag(samples[:10000]), label='Q')
  218. plt.grid(True)
  219. plt.xlabel('Time (s)')
  220. plt.ylabel('Amplitude')
  221. plt.legend()
  222. plt.title('I/Q Samples (Time Domain)')
  223.  
  224. plt.tight_layout()
  225. plt.show()
  226.  
  227. def close(self):
  228. """
  229. Close HackRF device and clean up
  230. """
  231. self.sdr.pyhackrf_close()
  232. pyhackrf.pyhackrf_exit()
  233. print("HackRF device closed")
  234.  
  235. def __enter__(self):
  236. return self
  237.  
  238. def __exit__(self, exc_type, exc_val, exc_tb):
  239. self.close()
  240.  
  241.  
  242. def main():
  243. """
  244. Main function demonstrating the usage of HackRFAudioProcessor
  245. """
  246. # Create processor with FM radio frequency (adjust for your location)
  247. # Using 100 MHz as an example FM radio frequency
  248. processor = HackRFAudioProcessor(center_freq=100e6, sample_rate=10e6)
  249.  
  250. try:
  251. # Capture samples
  252. print("Capturing RF samples...")
  253. samples = processor.capture_samples(duration=3.0)
  254.  
  255. # Analyze signal
  256. print("Analyzing signal...")
  257. processor.analyze_signal(samples)
  258.  
  259. # Demodulate FM
  260. print("Demodulating FM signal...")
  261. audio = processor.demodulate_fm(samples)
  262.  
  263. # Process audio
  264. print("Processing audio...")
  265. audio_processed = processor.process_audio(audio)
  266.  
  267. # Play audio
  268. print("Playing audio...")
  269. processor.play_audio(audio_processed, duration=3.0)
  270.  
  271. finally:
  272. # Clean up
  273. processor.close()
  274.  
  275.  
  276. if __name__ == "__main__":
  277. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement