Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from python_hackrf import pyhackrf
- import numpy as np
- import matplotlib.pyplot as plt
- from scipy import signal
- import sounddevice as sd
- class HackRFAudioProcessor:
- def __init__(self, center_freq=100e6, sample_rate=10e6, baseband_filter=7.5e6,
- lna_gain=30, vga_gain=50, buffer_size=131072):
- """
- Initialize the HackRF and setup parameters for audio reception
- Parameters:
- - center_freq: Center frequency in Hz (default: 100e6 for FM radio)
- - sample_rate: Sample rate in samples/sec (default: 10e6)
- - baseband_filter: Baseband filter bandwidth in Hz
- - lna_gain: LNA (RF) gain in dB (0-40 in 8 dB steps)
- - vga_gain: VGA (baseband) gain in dB (0-62 in 2 dB steps)
- - buffer_size: Size of buffer for samples
- """
- self.center_freq = center_freq
- self.sample_rate = sample_rate
- self.baseband_filter = baseband_filter
- self.lna_gain = lna_gain
- self.vga_gain = vga_gain
- self.buffer_size = buffer_size
- # Initialize device
- pyhackrf.pyhackrf_init()
- self.sdr = pyhackrf.pyhackrf_open()
- # Configure device
- self.allowed_filter = pyhackrf.pyhackrf_compute_baseband_filter_bw_round_down_lt(baseband_filter)
- self.sdr.pyhackrf_set_sample_rate(sample_rate)
- self.sdr.pyhackrf_set_baseband_filter_bandwidth(self.allowed_filter)
- self.sdr.pyhackrf_set_antenna_enable(False)
- self.sdr.pyhackrf_set_freq(center_freq)
- self.sdr.pyhackrf_set_amp_enable(False)
- self.sdr.pyhackrf_set_lna_gain(lna_gain)
- self.sdr.pyhackrf_set_vga_gain(vga_gain)
- # Prepare buffer for samples
- self.samples = np.zeros(self.buffer_size, dtype=np.complex64)
- self.last_idx = 0
- print(f"HackRF initialized with:")
- print(f"- Center frequency: {center_freq/1e6} MHz")
- print(f"- Sample rate: {sample_rate/1e6} MHz")
- print(f"- Baseband filter: {self.allowed_filter/1e6} MHz")
- def rx_callback(self, device, buffer, buffer_length, valid_length):
- """
- Callback function to process received samples
- """
- global last_idx, samples
- # Process received bytes
- accepted = valid_length // 2
- accepted_samples = buffer[:valid_length].astype(np.int8)
- # Convert to complex format (de-interleave I/Q)
- accepted_samples = accepted_samples[0::2] + 1j * accepted_samples[1::2]
- # Scale samples to -1 to +1 range
- accepted_samples = accepted_samples / 128.0
- # Store in buffer
- if self.last_idx + accepted <= self.buffer_size:
- self.samples[self.last_idx:self.last_idx + accepted] = accepted_samples
- self.last_idx += accepted
- return 0
- def capture_samples(self, duration=1.0):
- """
- Capture IQ samples for specified duration
- Parameters:
- - duration: Recording time in seconds
- Returns:
- - Complex IQ samples
- """
- self.last_idx = 0
- self.samples = np.zeros(int(duration * self.sample_rate), dtype=np.complex64)
- # Set callback function
- self.sdr.set_rx_callback(self.rx_callback)
- # Start receiving
- self.sdr.pyhackrf_start_rx()
- print(f"Started capturing samples for {duration} seconds...")
- # Wait for specified duration
- import time
- time.sleep(duration)
- # Stop receiving
- self.sdr.pyhackrf_stop_rx()
- print(f"Captured {self.last_idx} samples")
- # Skip first samples to avoid transients
- skip = int(0.1 * self.sample_rate)
- return self.samples[skip:self.last_idx] if self.last_idx > skip else self.samples[:self.last_idx]
- def demodulate_fm(self, samples, deviation=75e3):
- """
- Demodulate FM signal from complex IQ samples
- Parameters:
- - samples: Complex IQ samples
- - deviation: FM deviation in Hz (75 kHz for broadcast FM)
- Returns:
- - Demodulated audio samples
- """
- # FM demodulation (phase difference)
- samples_diff = np.diff(np.unwrap(np.angle(samples)))
- # Scale by deviation
- audio = samples_diff * (self.sample_rate / (2 * np.pi * deviation))
- return audio
- def process_audio(self, samples, target_rate=48000):
- """
- Process demodulated audio: filter and downsample
- Parameters:
- - samples: Demodulated audio samples
- - target_rate: Target audio sample rate
- Returns:
- - Processed audio samples at target rate
- """
- # Low-pass filter to audio frequencies (15 kHz for FM broadcast)
- nyq_rate = self.sample_rate / 2
- cutoff = 15000 / nyq_rate
- # Design filter
- b, a = signal.butter(5, cutoff, 'low')
- # Apply filter
- audio_filtered = signal.filtfilt(b, a, samples)
- # Resample to target rate
- resampling_factor = target_rate / self.sample_rate
- num_output_samples = int(len(audio_filtered) * resampling_factor)
- audio_resampled = signal.resample(audio_filtered, num_output_samples)
- return audio_resampled
- def play_audio(self, audio, sample_rate=48000, duration=None):
- """
- Play audio samples through sound device
- Parameters:
- - audio: Audio samples to play
- - sample_rate: Sample rate of audio
- - duration: Duration to play in seconds (None for all samples)
- """
- # Normalize audio
- audio = audio / np.max(np.abs(audio))
- # Limit duration if specified
- if duration is not None:
- max_samples = int(duration * sample_rate)
- audio = audio[:max_samples]
- # Play audio
- sd.play(audio, sample_rate)
- sd.wait()
- def analyze_signal(self, samples, fft_size=2048):
- """
- Analyze signal: generate spectrogram and spectrum
- Parameters:
- - samples: Complex IQ samples
- - fft_size: FFT size for spectrum analysis
- """
- # Create spectrogram
- plt.figure(figsize=(12, 10))
- # Spectrum plot
- plt.subplot(3, 1, 1)
- spectrum = 10 * np.log10(np.abs(np.fft.fftshift(np.fft.fft(samples[:fft_size]))) ** 2)
- freq = np.fft.fftshift(np.fft.fftfreq(fft_size, 1/self.sample_rate))
- freq_mhz = freq / 1e6 + self.center_freq / 1e6
- plt.plot(freq_mhz, spectrum)
- plt.grid(True)
- plt.xlabel('Frequency (MHz)')
- plt.ylabel('Power (dB)')
- plt.title('Signal Spectrum')
- # Spectrogram (waterfall)
- plt.subplot(3, 1, 2)
- num_rows = len(samples) // fft_size
- spectrogram = np.zeros((num_rows, fft_size))
- for i in range(num_rows):
- spectrogram[i, :] = 10 * np.log10(np.abs(np.fft.fftshift(
- np.fft.fft(samples[i * fft_size:(i+1) * fft_size]))) ** 2)
- extent = [(self.center_freq + self.sample_rate / -2) / 1e6,
- (self.center_freq + self.sample_rate / 2) / 1e6,
- len(samples) / self.sample_rate, 0]
- plt.imshow(spectrogram, aspect='auto', extent=extent)
- plt.xlabel('Frequency (MHz)')
- plt.ylabel('Time (s)')
- plt.title('Signal Spectrogram')
- # Time domain plot
- plt.subplot(3, 1, 3)
- time = np.arange(10000) / self.sample_rate
- plt.plot(time, np.real(samples[:10000]), label='I')
- plt.plot(time, np.imag(samples[:10000]), label='Q')
- plt.grid(True)
- plt.xlabel('Time (s)')
- plt.ylabel('Amplitude')
- plt.legend()
- plt.title('I/Q Samples (Time Domain)')
- plt.tight_layout()
- plt.show()
- def close(self):
- """
- Close HackRF device and clean up
- """
- self.sdr.pyhackrf_close()
- pyhackrf.pyhackrf_exit()
- print("HackRF device closed")
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- def main():
- """
- Main function demonstrating the usage of HackRFAudioProcessor
- """
- # Create processor with FM radio frequency (adjust for your location)
- # Using 100 MHz as an example FM radio frequency
- processor = HackRFAudioProcessor(center_freq=100e6, sample_rate=10e6)
- try:
- # Capture samples
- print("Capturing RF samples...")
- samples = processor.capture_samples(duration=3.0)
- # Analyze signal
- print("Analyzing signal...")
- processor.analyze_signal(samples)
- # Demodulate FM
- print("Demodulating FM signal...")
- audio = processor.demodulate_fm(samples)
- # Process audio
- print("Processing audio...")
- audio_processed = processor.process_audio(audio)
- # Play audio
- print("Playing audio...")
- processor.play_audio(audio_processed, duration=3.0)
- finally:
- # Clean up
- processor.close()
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement