Skip to content

Instantly share code, notes, and snippets.

@cloudlakecho
Created May 29, 2021 00:18
Show Gist options
  • Save cloudlakecho/e570b6a51c1bd91afaa808643d5cc3bb to your computer and use it in GitHub Desktop.
Save cloudlakecho/e570b6a51c1bd91afaa808643d5cc3bb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# Rx_Warmup_Automated.py - To warm up USRP
#
# GNU Radio Python Flow Graph
# Title: Warmup Receiver
# Author: Sean Holloway
# Modified by Cloud Cho
# Description: Receives signal on SDR and saves to file
# GNU Radio version: 3.8.2.0
#
# How to run this code
# (1) Please, modify "addr" for target USRP
# (2) $ Python Rx_Warmup_Automated.py <warm up time>
from gnuradio import blocks
from gnuradio import gr
from gnuradio.filter import firdes
import os, pdb, sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from gnuradio import uhd
import time
DEBUGGING = False
class Rx_Warmup(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "Warmup Receiver")
##################################################
# Variables
##################################################
self.warmup_time = warmup_time = 0
self.samp_rate = samp_rate = 10e6
self.packet_rate = packet_rate = 100
self.packet_len = packet_len = 1024
self.gain_rx = gain_rx = 25
self.freq = freq = 2.405e9
self.collect_time = collect_time = int(sys.argv[1])
##################################################
# Blocks
##################################################
if (DEBUGGING):
pdb.set_trace()
self.uhd_usrp_source_0 = uhd.usrp_source(
",".join(("addr=10.1.11.202", "")),
uhd.stream_args(
cpu_format="fc32",
args='',
channels=list(range(0,1)),
),
)
# self.uhd_usrp_source_0 = uhd.usrp_source(
# ",".join(("addr=10.1.11.202", ""))
# )
self.uhd_usrp_source_0.set_center_freq(freq, 0)
self.uhd_usrp_source_0.set_gain(gain_rx, 0)
self.uhd_usrp_source_0.set_antenna('RX2', 0)
self.uhd_usrp_source_0.set_bandwidth(100e6, 0)
self.uhd_usrp_source_0.set_samp_rate(samp_rate)
# No synchronization enforced.
self.blocks_null_sink_1 = blocks.null_sink(gr.sizeof_gr_complex*1)
self.blocks_head_0 = blocks.head(gr.sizeof_gr_complex*1, int(samp_rate * collect_time))
##################################################
# Connections
##################################################
self.connect((self.blocks_head_0, 0), (self.blocks_null_sink_1, 0))
self.connect((self.uhd_usrp_source_0, 0), (self.blocks_head_0, 0))
def get_warmup_time(self):
return self.warmup_time
def set_warmup_time(self, warmup_time):
self.warmup_time = warmup_time
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.blocks_head_0.set_length(int(self.samp_rate * self.collect_time))
self.uhd_usrp_source_0.set_samp_rate(self.samp_rate)
def get_packet_rate(self):
return self.packet_rate
def set_packet_rate(self, packet_rate):
self.packet_rate = packet_rate
def get_packet_len(self):
return self.packet_len
def set_packet_len(self, packet_len):
self.packet_len = packet_len
def get_gain_rx(self):
return self.gain_rx
def set_gain_rx(self, gain_rx):
self.gain_rx = gain_rx
self.uhd_usrp_source_0.set_gain(self.gain_rx, 0)
def get_freq(self):
return self.freq
def set_freq(self, freq):
self.freq = freq
self.uhd_usrp_source_0.set_center_freq(self.freq, 0)
def get_collect_time(self):
return self.collect_time
def set_collect_time(self, collect_time):
self.collect_time = collect_time
self.blocks_head_0.set_length(int(self.samp_rate * self.collect_time))
def main(top_block_cls=Rx_Warmup, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
try:
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
except Exception as e:
print (e.args)
pdb.set_trace()
try:
tb.start()
except Exception as e:
print (e.args)
pdb.set_trace()
try:
tb.wait()
except Exception as e:
print (e.args)
pdb.set_trace()
else:
print ("Warming up completed")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment