Created
June 13, 2023 23:06
-
-
Save lachesis/11d5d541d4a1fff2e68f3a185b00b533 to your computer and use it in GitHub Desktop.
Flipper IR explorations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Uses flipper as an IR blaster to drive my Bionaire window fan into the desired state | |
# Also uses my Kasa plug to reset the fan into a known starting state | |
# deps: pip install pyflipper | |
# the remote protocol does on-off keying (i.e. ASK) of an IR carrier | |
# the carrier is a 38 KHz square wave with a 33% duty cycle | |
# (the fan still seems to work with a 50% duty cycle as well) | |
# the signal has a base period time T that I have rounded to 400ms | |
# the following symbols are used: | |
# SHORT - 1T long | |
# LONG - 3T long | |
# BLANK - 15T long | |
# bits are represented by pairs of (On, Off) timings | |
# (LONG On, SHORT Off) -> 0 | |
# (SHORT On, LONG Off) -> 1 | |
# (arbitrarily assigned) | |
# messages are 12 bits long and must be repeated at least 2x to be acted upon | |
# the Off period of the last bit must be at least 15T (aka BLANK) µs | |
# this is probably all wrong | |
# don't expect to use this as is for any other device | |
# it has lots of janky random hard coded things | |
# ...but it seems to work | |
import socket | |
import time | |
import sys | |
import pyflipper.pyflipper | |
KASA_PLUG_IP = 'kasa-plug3' | |
FLIPPER_COMM_PORT = '/dev/ttyACM0' | |
PATH_TO_SAVED_REMOTE = '/ext/infrared/Winfan.ir' # file on the flipper containing the captures | |
# length in us of short samples vs long samples vs end of msg blank samples | |
SHORT = 400 | |
LONG = 3 * SHORT # 1200 | |
BLANK = 15 * SHORT # 6000 | |
# length in bits of a message; bold variable name | |
LEN = 12 | |
# data needed for modelling the state of the Fan | |
STATE_SET = ( | |
(['OFF', 'LOW', 'MED', 'HIGH'], 'Pwr'), | |
(['AUTO', 'ON'], 'Mode'), | |
(['IN', 'OUT', 'BOTH'], 'Airflow'), | |
) | |
START_STATE = ('OFF', 'AUTO', 'IN') | |
flipper = pyflipper.pyflipper.PyFlipper(com=FLIPPER_COMM_PORT) | |
# IR reverse engineering | |
def parse_samples(raw_samples): | |
return [int(x) for x in raw_samples.split(' ')] | |
def flatten(l): | |
return [item for sublist in l for item in sublist] | |
def split_pairs(samples): | |
return [(samples[i], samples[i+1]) for i in range(0, len(samples)//2*2, 2)] | |
def bits2num(bits): | |
return int(''.join(str(s) for s in bits), 2) | |
def num2bits(num): | |
return [int(x) for x in bin(num)[2:].rjust(LEN, '0')] | |
# stop after first BLANK period (aka get minimum message) | |
def minimize(samples): | |
# threshold the samples into the symbols SHORT LONG or BLANK | |
samples = [SHORT if x < SHORT*1.5 else LONG if x < LONG*1.5 else BLANK for x in samples] | |
for i, v in enumerate(samples): | |
if i % 2 == 0: continue | |
if v > LONG*1.5: # must be BLANK | |
i += 1 | |
break | |
else: | |
raise ValueError("Could not find long step") | |
return samples[:i] | |
BIT_MAP = { | |
# pairs of (on-time, off-time) | |
(LONG, SHORT) : 0, | |
(SHORT, LONG) : 1, | |
(SHORT, SHORT) : 'X', # invalid messages | |
(LONG, LONG) : 'X', | |
(LONG, BLANK) : 0, # special case the extra long blank at end | |
(SHORT, BLANK) : 1, | |
} | |
def decode(samples): | |
if isinstance(samples[0], str): | |
samples = parse_samples(samples) | |
samples = minimize(samples) | |
pairs = [(samples[i], samples[i+1]) for i in range(0, len(samples)//2*2, 2)] | |
bits = [BIT_MAP[p] for p in pairs] | |
return bits | |
def encode(bitlist): | |
pairs = [(SHORT, LONG) if b else (LONG, SHORT) for b in bitlist] | |
samples = flatten(pairs) | |
# adding BLANK to existing off would probably be better | |
# but then the rounded samples from | |
samples[-1] = BLANK # always end with a blank | |
return samples | |
# flipper IR blaster and storage stuff | |
def read_winfan(): | |
txt = flipper.storage.read(PATH_TO_SAVED_REMOTE) | |
samps = {} | |
name = None | |
data = None | |
# get the name of the button and its data | |
for line in txt.split('\n'): | |
if line.startswith('name:'): | |
name = line.split(': ')[1] | |
elif line.startswith('data:'): | |
data = line.split(': ', 1)[1] | |
samps[name] = data | |
return samps | |
def send(bits): | |
if isinstance(bits, int): | |
bits = num2bits(bits) | |
if isinstance(bits, list): | |
if len(bits) == LEN: | |
samples = encode(bits) | |
samples += samples | |
elif len(bits) > LEN: | |
samples = bits | |
else: | |
raise ValueError("HUH?") | |
else: | |
raise ValueError("HUH?") | |
# hard coded these based on reading the flipper remote file | |
flipper.ir.tx_raw(frequency=38000, duty_cycle=0.33, samples=samples) | |
def devel_main(): | |
# dumb test to make sure bit packing code works | |
for i in range(2**12): | |
bits = num2bits(i) | |
assert len(bits) == 12 | |
assert i == bits2num(bits) | |
# read file from flipper | |
samps = read_winfan() | |
# could also read a remote button press directly with | |
# print("press remote button now") | |
# txt = flipper.ir.rx(timeout=5) | |
# todo: some code that parses the returned serial text | |
# decode each command & print its bits & number | |
cmds = {} | |
for name, raw_samples in samps.items(): | |
samples = parse_samples(raw_samples) | |
samples = minimize(samples) | |
bits = decode(samples) | |
num = bits2num(bits) | |
cmds[name] = num | |
print(name.ljust(10, ' '), bits, hex(num)) | |
assert encode(bits) == samples | |
print(repr(cmds)) | |
# kasa switch stuff | |
def kasa_scramble(plaintext): | |
n = len(plaintext) | |
payload = [] | |
key = 0xAB | |
for i in range(n): | |
key = plaintext[i] ^ key | |
payload.append(key) | |
return bytes(payload) | |
def kasa_send(hostname, cmd): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
data = kasa_scramble(cmd) | |
s.sendto(data, (hostname, 9999)) | |
def kasa_switch(hostname, state): | |
return kasa_send( hostname, b'{"system":{"set_relay_state":{"state":%d}}}' % (bool(state),) ) | |
# fan state management stuff | |
def validate_state(state): | |
if len(state) != len(STATE_SET): | |
return False | |
for (state_list, cmd), cur_val in zip(STATE_SET, state): | |
if cur_val not in state_list: | |
return False | |
return True | |
def steps_needed(state_list, cur_val, new_val): | |
cur_idx = state_list.index(cur_val) | |
new_idx = state_list.index(new_val) | |
if new_idx == cur_idx: | |
return 0 | |
elif new_idx > cur_idx: | |
return new_idx - cur_idx | |
else: | |
return len(state_list) + new_idx - cur_idx | |
def compute_todo(cur_state, new_state): | |
cmds = [] | |
for (state_list, cmd), cur_val, new_val in zip(STATE_SET, cur_state, new_state): | |
steps = steps_needed(state_list, cur_val, new_val) | |
cmds.extend( [cmd] * steps ) | |
return cmds | |
def main(): | |
# generated by devel_main() | |
commands = { | |
'Pwr' : 0x239, | |
'Tup' : 0x23c, | |
'Tdwn' : 0x25f, | |
'Mode' : 0x26f, | |
'Airflow': 0x277, | |
} | |
# desired default state for the fan with no inputs | |
new_state = ('HIGH', 'ON', 'IN') | |
if len(sys.argv) == 4: # literal state | |
new_state = [x.upper() for x in sys.argv[1:4]] | |
if not validate_state(new_state): | |
raise ValueError("Invalid requested state: %r" % new_state) | |
if len(sys.argv) == 2: # literal cmd | |
cmd = sys.argv[1] | |
send(commands[cmd]) | |
return | |
state = START_STATE | |
todo = compute_todo(state, new_state) | |
print("Going to state:", new_state) | |
print(todo) | |
# power cycle to get into known state | |
kasa_switch(KASA_PLUG_IP, False) | |
time.sleep(0.7) | |
kasa_switch(KASA_PLUG_IP, True) | |
time.sleep(0.7) | |
# send commands to get into desired state | |
for cmd in todo: | |
send(commands[cmd]) | |
time.sleep(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment