Last active
September 5, 2020 16:04
-
-
Save jdiez17/ed1b9016ce26db602298 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from gnuradio import analog | |
from gnuradio import blocks | |
from gnuradio import eng_notation | |
from gnuradio import filter | |
from gnuradio import gr | |
from gnuradio.eng_option import eng_option | |
from gnuradio.filter import firdes | |
from optparse import OptionParser | |
import osmosdr | |
import numpy | |
def sublistExists(list, sublist): | |
for i in range(len(list)-len(sublist)+1): | |
if sublist == list[i:i+len(sublist)]: | |
return True | |
return False | |
DOORBELL_ID = [0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1] | |
ZERO_PULSE = 200 # microseconds | |
ONE_PULSE = 600 # microseconds | |
SAMPLE_RATE = 50000 # Hz | |
SEQUENCE = [1, 0, 1, 0, 1, 0] | |
END_OF_MSG = [1, 1, 1] | |
PACKET_LEN = 18 | |
SAMPLES_ZERO_PULSE = SAMPLE_RATE * (ZERO_PULSE/1000000.) | |
SAMPLES_ONE_PULSE = SAMPLE_RATE * (ONE_PULSE/1000000.) | |
class pwm_demod_py(gr.sync_block): | |
def __init__(self): | |
self.state = 0 | |
self.count = 0 | |
self.symbols = [] | |
gr.sync_block.__init__(self, | |
name="pwm_demod_py", | |
in_sig=[numpy.uint8], | |
out_sig=[]) | |
def packet_handle(self, packet): | |
if len(packet) == PACKET_LEN: | |
# send message to homeassistant here | |
pass | |
def process_chunk(self, chunk, eom=END_OF_MSG): | |
for sample in chunk: | |
sample = int(sample) | |
if sample == self.state: | |
self.count += 1 | |
else: | |
# Transition to 0: previous state was 1, try to decode symbol. | |
# Only emit symbol if count is over threshold | |
if sample == 0 and self.count > SAMPLES_ZERO_PULSE / 2: | |
symbol = 1 if self.count > SAMPLES_ONE_PULSE - 10 else 0 | |
self.symbols.append(symbol) | |
# End of packet detector; | |
# yield symbols if packet trailer detected or long pause | |
if sublistExists(self.symbols, END_OF_MSG) or \ | |
(sample == 1 and self.count > SAMPLES_ONE_PULSE + 10): | |
self.packet_handle(self.symbols) | |
self.symbols = [] | |
self.count = 1 | |
self.state = sample | |
def work(self, input_items, output_items): | |
in0 = input_items[0] | |
self.process_chunk(in0) | |
return len(in0) | |
class top_block(gr.top_block): | |
def __init__(self): | |
gr.top_block.__init__(self) | |
################################################## | |
# Variables | |
################################################## | |
self.threshold = threshold = 0.5 | |
self.samp_rate = samp_rate = 0.25e6 | |
self.gain = gain = 1 | |
self.freq = freq = 434.92e6 | |
self.rational_resampler_xxx_0 = filter.rational_resampler_ccc( | |
interpolation=1, | |
decimation=5, | |
taps=None, | |
fractional_bw=None, | |
) | |
self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + "" ) | |
self.osmosdr_source_0.set_sample_rate(samp_rate) | |
self.osmosdr_source_0.set_center_freq(freq, 0) | |
self.osmosdr_source_0.set_freq_corr(0, 0) | |
self.osmosdr_source_0.set_dc_offset_mode(0, 0) | |
self.osmosdr_source_0.set_iq_balance_mode(0, 0) | |
self.osmosdr_source_0.set_gain_mode(False, 0) | |
self.osmosdr_source_0.set_gain(0, 0) | |
self.osmosdr_source_0.set_if_gain(24, 0) | |
self.osmosdr_source_0.set_bb_gain(38, 0) | |
self.osmosdr_source_0.set_antenna("", 0) | |
self.osmosdr_source_0.set_bandwidth(0, 0) | |
self.low_pass_filter_0 = filter.fir_filter_ccf(1, firdes.low_pass( | |
1, 1e6, 100e3, 1e3, firdes.WIN_HAMMING, 6.76)) | |
self.blocks_threshold_ff_0 = blocks.threshold_ff(threshold, threshold, 0) | |
self.blocks_multiply_xx_0 = blocks.multiply_vcc(1) | |
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((gain, )) | |
self.blocks_float_to_char_0 = blocks.float_to_char(1, 1) | |
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1) | |
self.pwm_demod = pwm_demod_py() | |
################################################## | |
# Connections | |
################################################## | |
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_threshold_ff_0, 0)) | |
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.rational_resampler_xxx_0, 0)) | |
self.connect((self.blocks_threshold_ff_0, 0), (self.blocks_float_to_char_0, 0)) | |
self.connect((self.low_pass_filter_0, 0), (self.blocks_multiply_const_vxx_0, 0)) | |
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0)) | |
self.connect((self.rational_resampler_xxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0)) | |
self.connect((self.blocks_float_to_char_0, 0), (self.pwm_demod, 0)) | |
def get_threshold(self): | |
return self.threshold | |
def set_threshold(self, threshold): | |
self.threshold = threshold | |
self._threshold_slider.set_value(self.threshold) | |
self._threshold_text_box.set_value(self.threshold) | |
self.blocks_threshold_ff_0.set_hi(self.threshold) | |
self.blocks_threshold_ff_0.set_lo(self.threshold) | |
def get_samp_rate(self): | |
return self.samp_rate | |
def set_samp_rate(self, samp_rate): | |
self.samp_rate = samp_rate | |
self.analog_sig_source_x_0.set_sampling_freq(self.samp_rate) | |
self.blocks_throttle_0.set_sample_rate(self.samp_rate) | |
self.osmosdr_source_0.set_sample_rate(self.samp_rate) | |
def get_gain(self): | |
return self.gain | |
def set_gain(self, gain): | |
self.gain = gain | |
self._gain_slider.set_value(self.gain) | |
self._gain_text_box.set_value(self.gain) | |
self.blocks_multiply_const_vxx_0.set_k((self.gain, )) | |
def get_freq(self): | |
return self.freq | |
def set_freq(self, freq): | |
self.freq = freq | |
self._freq_slider.set_value(self.freq) | |
self._freq_text_box.set_value(self.freq) | |
self.osmosdr_source_0.set_center_freq(self.freq, 0) | |
if __name__ == '__main__': | |
parser = OptionParser(option_class=eng_option, usage="%prog: [options]") | |
(options, args) = parser.parse_args() | |
tb = top_block() | |
tb.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment