Created
November 17, 2015 17:11
-
-
Save jdiez17/87a97e5172d5a3be3830 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from gnuradio import analog | |
from gnuradio import blocks | |
from gnuradio import eng_notation | |
from gnuradio import filter | |
from gnuradio import gr | |
from gnuradio.eng_option import eng_option | |
from gnuradio.filter import firdes | |
from optparse import OptionParser | |
import osmosdr | |
import numpy | |
import datetime | |
import paho.mqtt.publish as mqtt | |
import urllib | |
import urllib2 | |
def sublistExists(list, sublist): | |
for i in range(len(list)-len(sublist)+1): | |
if sublist == list[i:i+len(sublist)]: | |
return True | |
return False | |
DOORBELL_ID = [0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1] | |
ZERO_PULSE = 200 # microseconds | |
ONE_PULSE = 600 # microseconds | |
SAMPLE_RATE = 50000 # Hz | |
SEQUENCE = [1, 0, 1, 0, 1, 0] | |
END_OF_MSG = [1, 1, 1] | |
PACKET_LEN = 18 | |
SAMPLES_ZERO_PULSE = SAMPLE_RATE * (ZERO_PULSE/1000000.) | |
SAMPLES_ONE_PULSE = SAMPLE_RATE * (ONE_PULSE/1000000.) | |
MQTT_BROKER = "127.0.0.1" | |
TOPIC = "outside/doorbell" | |
RATELIMIT = 5 # seconds | |
LAST_MESSAGE = datetime.datetime.now() | |
HTTP_WEBHOOK = "http://localhost:8000/event" | |
def notify_homeassistant(packet): | |
global LAST_MESSAGE # sorry | |
now = datetime.datetime.now() | |
delta = now - LAST_MESSAGE | |
if delta.seconds > RATELIMIT: | |
print("publishing " + str(packet)) | |
try: | |
http_message = urllib.urlencode({'message': 'doorbell ' + str(packet)}) | |
urllib2.urlopen(HTTP_WEBHOOK, data=http_message).close() | |
except Exception as e: | |
print(str(e)) | |
mqtt.single(TOPIC, str(packet), hostname=MQTT_BROKER) | |
LAST_MESSAGE = now | |
class pwm_demod_py(gr.sync_block): | |
def __init__(self, callback=None): | |
self.state = 0 | |
self.count = 0 | |
self.symbols = [] | |
self.callback = callback | |
gr.sync_block.__init__(self, | |
name="pwm_demod_py", | |
in_sig=[numpy.uint8], | |
out_sig=[]) | |
def packet_handle(self, packet): | |
if len(packet) == PACKET_LEN: | |
if self.callback: | |
self.callback(packet) | |
def process_chunk(self, chunk, eom=END_OF_MSG): | |
for sample in chunk: | |
sample = int(sample) | |
if sample == self.state: | |
self.count += 1 | |
else: | |
# Transition to 0: previous state was 1, try to decode symbol. | |
# Only emit symbol if count is over threshold | |
if sample == 0 and self.count > SAMPLES_ZERO_PULSE / 2: | |
symbol = 1 if self.count > SAMPLES_ONE_PULSE - 10 else 0 | |
self.symbols.append(symbol) | |
# End of packet detector; | |
# yield symbols if packet trailer detected or long pause | |
if sublistExists(self.symbols, END_OF_MSG) or \ | |
(sample == 1 and self.count > SAMPLES_ONE_PULSE + 10): | |
self.packet_handle(self.symbols) | |
self.symbols = [] | |
self.count = 1 | |
self.state = sample | |
def work(self, input_items, output_items): | |
in0 = input_items[0] | |
self.process_chunk(in0) | |
return len(in0) | |
class top_block(gr.top_block): | |
def __init__(self): | |
gr.top_block.__init__(self) | |
################################################## | |
# Variables | |
################################################## | |
self.threshold = threshold = 0.5 | |
self.samp_rate = samp_rate = 0.25e6 | |
self.gain = gain = 1 | |
self.freq = freq = 434.92e6 | |
self.rational_resampler_xxx_0 = filter.rational_resampler_ccc( | |
interpolation=1, | |
decimation=5, | |
taps=None, | |
fractional_bw=None, | |
) | |
self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + "" ) | |
self.osmosdr_source_0.set_sample_rate(samp_rate) | |
self.osmosdr_source_0.set_center_freq(freq, 0) | |
self.osmosdr_source_0.set_freq_corr(0, 0) | |
self.osmosdr_source_0.set_dc_offset_mode(0, 0) | |
self.osmosdr_source_0.set_iq_balance_mode(0, 0) | |
self.osmosdr_source_0.set_gain_mode(False, 0) | |
self.osmosdr_source_0.set_gain(0, 0) | |
self.osmosdr_source_0.set_if_gain(24, 0) | |
self.osmosdr_source_0.set_bb_gain(38, 0) | |
self.osmosdr_source_0.set_antenna("", 0) | |
self.osmosdr_source_0.set_bandwidth(0, 0) | |
self.low_pass_filter_0 = filter.fir_filter_ccf(1, firdes.low_pass( | |
1, 1e6, 100e3, 1e3, firdes.WIN_HAMMING, 6.76)) | |
self.blocks_threshold_ff_0 = blocks.threshold_ff(threshold, threshold, 0) | |
self.blocks_multiply_xx_0 = blocks.multiply_vcc(1) | |
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((gain, )) | |
self.blocks_float_to_char_0 = blocks.float_to_char(1, 1) | |
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1) | |
self.pwm_demod = pwm_demod_py(callback=notify_homeassistant) | |
################################################## | |
# Connections | |
################################################## | |
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_threshold_ff_0, 0)) | |
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.rational_resampler_xxx_0, 0)) | |
self.connect((self.blocks_threshold_ff_0, 0), (self.blocks_float_to_char_0, 0)) | |
self.connect((self.low_pass_filter_0, 0), (self.blocks_multiply_const_vxx_0, 0)) | |
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0)) | |
self.connect((self.rational_resampler_xxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0)) | |
self.connect((self.blocks_float_to_char_0, 0), (self.pwm_demod, 0)) | |
def get_threshold(self): | |
return self.threshold | |
def set_threshold(self, threshold): | |
self.threshold = threshold | |
self._threshold_slider.set_value(self.threshold) | |
self._threshold_text_box.set_value(self.threshold) | |
self.blocks_threshold_ff_0.set_hi(self.threshold) | |
self.blocks_threshold_ff_0.set_lo(self.threshold) | |
def get_samp_rate(self): | |
return self.samp_rate | |
def set_samp_rate(self, samp_rate): | |
self.samp_rate = samp_rate | |
self.analog_sig_source_x_0.set_sampling_freq(self.samp_rate) | |
self.blocks_throttle_0.set_sample_rate(self.samp_rate) | |
self.osmosdr_source_0.set_sample_rate(self.samp_rate) | |
def get_gain(self): | |
return self.gain | |
def set_gain(self, gain): | |
self.gain = gain | |
self._gain_slider.set_value(self.gain) | |
self._gain_text_box.set_value(self.gain) | |
self.blocks_multiply_const_vxx_0.set_k((self.gain, )) | |
def get_freq(self): | |
return self.freq | |
def set_freq(self, freq): | |
self.freq = freq | |
self._freq_slider.set_value(self.freq) | |
self._freq_text_box.set_value(self.freq) | |
self.osmosdr_source_0.set_center_freq(self.freq, 0) | |
if __name__ == '__main__': | |
parser = OptionParser(option_class=eng_option, usage="%prog: [options]") | |
(options, args) = parser.parse_args() | |
tb = top_block() | |
tb.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment