Skip to content

Instantly share code, notes, and snippets.

@persquare
Last active November 26, 2017 19:56
Show Gist options
  • Save persquare/0c721cd928108b3c71023f1ffc9b12b8 to your computer and use it in GitHub Desktop.
Save persquare/0c721cd928108b3c71023f1ffc9b12b8 to your computer and use it in GitHub Desktop.
A simple RPi controller for Nexa switches based on pigpio. Needs 433MHz transmitter.
#!/usr/bin/env python
# To install pigpio see http://abyz.me.uk/rpi/pigpio/index.html
import pigpio
class Waveform(object):
# Times in microseconds
T1 = 250
T5 = 5*T1
T10 = 10*T1
T40 = 40*T1
def __init__(self, pin, tx_id, state, group=1, chan=3, unit=3):
self.bit = 1<<pin
self.msg = (tx_id & 0x03FFFFFF) << 6
self.msg |= ((group & 0x1) << 5)
self.msg |= ((int(state) & 0x1) << 4)
self.msg |= ((chan & 0x3) << 2)
self.msg |= (unit & 0x3)
def nexa(self):
self.wf = []
self.sync()
self.message()
self.pause()
return self.wf
def high(self, t):
self.wf.append(pigpio.pulse(self.bit, 0, t))
def low(self, t):
self.wf.append(pigpio.pulse(0, self.bit, t))
def encode_one(self):
self.high(self.T1)
self.low(self.T1)
self.high(self.T1)
self.low(self.T5)
def encode_zero(self):
self.high(self.T1)
self.low(self.T5)
self.high(self.T1)
self.low(self.T1)
def sync(self):
self.high(self.T1)
self.low(self.T10)
def pause(self):
self.high(self.T1)
self.low(self.T40)
def message(self):
binary_number_string = format(self.msg, '032b')
for digit in binary_number_string:
if digit == '1':
self.encode_one()
else:
self.encode_zero()
class NexaSwitcher(object):
def __init__(self, pin, tx_id):
self.io = pigpio.pi()
# Make sure pin in ouput and held low
self.io.write(pin, 0)
self.pin = pin
self.tx_id = tx_id
wf_on = Waveform(pin, tx_id, 1)
wf_off = Waveform(pin, tx_id, 0)
print wf_on, wf_off
self.io.wave_clear()
self.io.wave_add_generic(wf_on.nexa())
self.on_seq = self.io.wave_create()
self.io.wave_add_generic(wf_off.nexa())
self.off_seq = self.io.wave_create()
def switch(self, on_off, repeat=3):
# The nexa negates the value, so we do too...
seq = self.off_seq if on_off else self.on_seq
# self.io.wave_send_once(seq)
self.io.wave_chain([255, 0, seq, 255, 2, 0x88, 0x13, 255, 1, repeat, 0])
# Make sure we don't jam the 433MHz band
# self.io.write(self.pin, 0)
@property
def busy(self):
return self.io.wave_tx_busy()
def rswitch(pin, unit, state, repeat=3):
n = NexaSwitcher(pin, unit)
n.switch(state, repeat)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Control Nexa switches.')
parser.add_argument('-p', '--pin', default=18, type=int, help='GPIO pin controlling transmitter (default is 18).')
parser.add_argument('-r', '--repeat', default=3, type=int, help='Number of times to repeat the sequence (default is 3).')
parser.add_argument('tx_id', type=int, help='Switch ID number in range [1, 2^26-1]')
parser.add_argument('state', choices=[0, 1], type=int, help='State to set on switch')
args = parser.parse_args()
n = NexaSwitcher(args.pin, args.tx_id)
n.switch(args.state, args.repeat)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment