Created
August 27, 2021 09:29
-
-
Save tai/f25c0d0568f603cf5c8f00878006832b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import os | |
| import re | |
| import time | |
| import pigpio | |
| import logging | |
| from collections import OrderedDict | |
| from contextlib import contextmanager | |
| from argparse import ArgumentParser | |
| log = logging.getLogger(__name__) | |
| def help(): | |
| p = os.path.basename(sys.argv[0]) | |
| return """ | |
| {p} - Control nRF24L01(P) over pigpio | |
| Usage: {p} [opts] <cmd|reg> [args...] | |
| Options: | |
| -s <sel>: select SPI chip | |
| -c <pin>: GPIO pin for CE | |
| -m <mode>: Set either of (CW|COMPAT|SB|ESB|NONE) | |
| -r <kbps>: Set datarate (250|1000|2000) | |
| -f <MHz>: Set center freq | |
| -p <pow>: Set power level (0:-18, 1:-12, 2:-6, 3:0dBm) | |
| Example: | |
| $ eval $({p} completion) | |
| $ {p} dump | |
| $ {p} ce 0 | |
| $ {p} ce 1 | |
| $ {p} STATUS 00 | |
| $ {p} R_REGISTER STATUS | |
| $ {p} W_REGISTER STATUS 00 | |
| NOTE: | |
| - Use PIGPIO_ADDR/PIGPIO_PORT to use remote pigpiod | |
| """.format(**locals()).lstrip() | |
| def usage(): | |
| sys.stderr.write(help()) | |
| sys.exit(0) | |
| def gencomp(): | |
| """Generate bash completion""" | |
| p = os.path.basename(sys.argv[0]) | |
| s = """ | |
| _PROG_helper() { | |
| local cur="${COMP_WORDS[COMP_CWORD]}"; | |
| if [ ${COMP_CWORD} -eq 1 ] ; then | |
| COMPREPLY=($(compgen -W "dump rw ce init rx tx CMDS REGS" "$cur")); | |
| elif [ ${COMP_CWORD} -eq 2 ] ; then | |
| COMPREPLY=($(compgen -W "dump REGS" "$cur")); | |
| fi | |
| }; | |
| complete -F _PROG_helper CMD | |
| """ | |
| s = s.replace("REGS", " ".join(REGS.keys())) | |
| s = s.replace("CMDS", " ".join(CMDS)) | |
| s = s.replace("PROG", p) | |
| return s | |
| ###################################################################### | |
| # nRF24L01 PS - 9.1 Register map table | |
| REGS_DEF = """ | |
| # NAME [ADDR [SIZE]] | |
| CONFIG 0x00 | |
| EN_AA | |
| EN_RXADDR | |
| SETUP_AW | |
| SETUP_RETR | |
| RF_CH | |
| RF_SETUP | |
| STATUS | |
| OBSERVE_TX | |
| CD 0x09 | |
| RPD 0x09 | |
| RX_ADDR_P0 0x0A 5 | |
| RX_ADDR_P1 0x0B 5 | |
| RX_ADDR_P2 | |
| RX_ADDR_P3 | |
| RX_ADDR_P4 | |
| RX_ADDR_P5 | |
| TX_ADDR 0x10 5 | |
| RX_PW_P0 | |
| RX_PW_P1 | |
| RX_PW_P2 | |
| RX_PW_P3 | |
| RX_PW_P4 | |
| RX_PW_P5 | |
| FIFO_STATUS | |
| DYNPD 0x1C | |
| FEATURE 0x1D | |
| """ | |
| def genregs(): | |
| addr = 0 | |
| size = 1 | |
| regs = OrderedDict() | |
| for i in REGS_DEF.splitlines(): | |
| if i.startswith("#"): continue | |
| entry = i.split() | |
| if len(entry) == 0: | |
| continue | |
| if len(entry) == 1: | |
| name, addr, size = entry[0], addr, 1 | |
| elif len(entry) == 2: | |
| name, addr, size = entry[0], int(entry[1], 16), 1 | |
| elif len(entry) == 3: | |
| name, addr, size = entry[0], int(entry[1], 16), int(entry[2]) | |
| reg = lambda:0 | |
| reg.name = name | |
| reg.addr = addr | |
| reg.size = size | |
| regs[name] = reg | |
| addr += 1 | |
| return regs | |
| REGS = genregs() | |
| ###################################################################### | |
| class NRF24(object): | |
| def __init__(self, ss, ce): | |
| self.ss = ss | |
| self.ce_pin = ce | |
| self.pi = pigpio.pi() | |
| @contextmanager | |
| def open(self): | |
| try: | |
| self.sh = self.pi.spi_open(self.ss, 1000000, 0) | |
| yield self | |
| finally: | |
| self.pi.spi_close(self.sh) | |
| def xfer(self, buf): | |
| return self.pi.spi_xfer(self.sh, buf) | |
| def ce(self, v): | |
| self.pi.write(self.ce_pin, v) | |
| def arg2buf(arg): | |
| if type(arg) is bytes: | |
| val = arg | |
| elif type(arg) is list: | |
| val = bytes(arg) | |
| elif type(arg) is int: | |
| nr = 1 if arg == 0 else int((arg.bit_length() + 7) / 8) | |
| val = arg.to_bytes(nr, 'big') | |
| elif arg.startswith('s:'): # handle ASCII string | |
| val = bytes(arg[2:], 'utf-8') | |
| elif arg.startswith('b:'): # handle MSByte-first syntax | |
| val = bytearray.fromhex(arg[2:]) | |
| val.reverse() | |
| elif arg.startswith('l:'): # handle LSByte-first syntax | |
| val = bytes.fromhex(arg[2:]) | |
| else: | |
| val = bytes.fromhex(arg) | |
| return bytes(val) | |
| ###################################################################### | |
| _CMDS_START = dir() | |
| def R_REGISTER(dev, regname): | |
| reg = REGS.get(regname) | |
| buf = [reg.addr] + [0] * reg.size | |
| return dev.xfer(buf) | |
| def W_REGISTER(dev, regname, data): | |
| reg = REGS.get(regname) | |
| val = arg2buf(data) | |
| vallen = len(val) | |
| if vallen != reg.size: | |
| log.warning("data size ({vallen}) != reg.size ({reg.size}): {regname}".format(**locals())) | |
| return None | |
| buf = [0x20 | reg.addr] + list(val) | |
| return dev.xfer(buf) | |
| def R_RX_PAYLOAD(dev, nr): | |
| buf = [0x61] + [0] * int(nr) | |
| return dev.xfer(buf) | |
| def W_TX_PAYLOAD(dev, data): | |
| buf = [0xA0] + list(arg2buf(data)) | |
| return dev.xfer(buf) | |
| def FLUSH_TX(dev): | |
| return dev.xfer([0xE1]) | |
| def FLUSH_RX(dev): | |
| return dev.xfer([0xE2]) | |
| def REUSE_TX_PL(dev): | |
| return dev.xfer([0xE3]) | |
| def ACTIVATE(dev): | |
| return dev.xfer([0x50, 0x73]) | |
| def R_RX_PL_WID(dev): | |
| return dev.xfer([0x60, 0x00]) | |
| def W_ACK_PAYLOAD(dev, pipe, data): | |
| buf = [0xA8 | (int(pipe) & 7)] + list(arg2buf(data)) | |
| return dev.xfer(buf) | |
| def W_TX_PAYLOAD_NOACK(dev, data): | |
| buf = [0xB0] + list(arg2buf(data)) | |
| return dev.xfer(buf) | |
| def NOP(dev): | |
| return dev.xfer([0xFF]) | |
| _CMDS_END = filter(lambda s: not s.startswith('_'), dir()) | |
| # list of SPI commands | |
| CMDS = set(_CMDS_END) - set(_CMDS_START) | |
| ###################################################################### | |
| def rwreg(dev, regname, val=None): | |
| if val: | |
| W_REGISTER(dev, regname, val) | |
| rwreg(dev, regname) | |
| else: | |
| c, raw = R_REGISTER(dev, regname) | |
| print( | |
| "{regname:>12s}".format(**locals()), | |
| raw[1:].hex(" "), "/", | |
| " ".join(["{:0>8b}".format(i) for i in raw[1:]])) | |
| def hwinit(dev, opt): | |
| # DS 8.3.1 SPI Commands | |
| # W_REGISTER - Executable in power down or standby mode only | |
| dev.ce(0) | |
| # set freq | |
| W_REGISTER(dev, "RF_CH", 0x7F & (opt.freq - 2400)) | |
| # NOTE: | |
| # - 1Mbps is expected for COMPAT/SB mode | |
| rate = 0x20 if opt.rate == 250 else 0x08 if opt.rate == 2000 else 0x00 | |
| power = (0x03 & opt.power) << 1 | |
| W_REGISTER(dev, "RF_SETUP", rate|power|0x01) | |
| # DS 9.1 Register map table, footnote (a) | |
| # set timeout long enough for any payload length at any rate | |
| W_REGISTER(dev, "SETUP_RETR", 0xF3) | |
| # DS Appendix C - Carrier wave output power | |
| if opt.mode == 'CW': | |
| W_REGISTER(dev, "CONFIG", 0x02) # PWR_UP=1, PRIM_RX=0, EN_CRC=0 | |
| time.sleep(0.001) | |
| W_REGISTER(dev, "EN_AA", 0x00) | |
| W_REGISTER(dev, "SETUP_RETR", 0x00) | |
| W_REGISTER(dev, "RF_SETUP", rate|power|0x91) # CONT_WAVE=1, PLL_LOCK=1 | |
| W_REGISTER(dev, "TX_ADDR", [0xFF, 0xFF, 0xFF, 0xFF, 0xFF]) | |
| W_TX_PAYLOAD(dev, "FF" * 32) | |
| dev.ce(1) | |
| time.sleep(0.001) | |
| REUSE_TX_PL(dev) | |
| # DS Appendix B - Configuration for compatibility with nRF24XX | |
| elif opt.mode == 'COMPAT': | |
| #ACTIVATE(dev) | |
| W_REGISTER(dev, "CONFIG", 0x0A) | |
| W_REGISTER(dev, "EN_AA", 0x00) | |
| W_REGISTER(dev, "FEATURE", 0x00) | |
| W_REGISTER(dev, "DYNPD", 0x00) | |
| W_REGISTER(dev, "SETUP_RETR", 0x00) | |
| # DS 7.10 Compatibility with ShockBurst | |
| elif opt.mode == 'SB': | |
| #ACTIVATE(dev) | |
| W_REGISTER(dev, "CONFIG", 0x0A) | |
| W_REGISTER(dev, "EN_AA", 0x00) | |
| W_REGISTER(dev, "FEATURE", 0x00) | |
| W_REGISTER(dev, "DYNPD", 0x00) | |
| W_REGISTER(dev, "SETUP_RETR", 0xF3) | |
| # DS 7. Enchanced ShockBurst | |
| # DS Appendix A - Configuration and Communication Example | |
| elif opt.mode == 'ESB': | |
| ACTIVATE(dev) | |
| W_REGISTER(dev, "CONFIG", 0x0A) | |
| W_REGISTER(dev, "EN_AA", 0x3F) | |
| W_REGISTER(dev, "FEATURE", 0x07) | |
| W_REGISTER(dev, "DYNPD", 0x3F) | |
| W_REGISTER(dev, "SETUP_RETR", 0xF3) | |
| # clear flags | |
| FLUSH_TX(dev) | |
| FLUSH_RX(dev) | |
| W_REGISTER(dev, "STATUS", 0xFF) | |
| # enable | |
| dev.ce(1) | |
| def rxdata(dev, nr=0): | |
| # PRIM_RX=1 | |
| c, raw = R_REGISTER(dev, "CONFIG") | |
| cfg = raw[1] | 0x01 | |
| W_REGISTER(dev, "CONFIG", cfg) | |
| nr = int(nr) | |
| if nr > 0: | |
| # Needed by pre-ESB protocols (SB, COMPAT) | |
| W_REGISTER(dev, "RX_PW_P0", nr) | |
| W_REGISTER(dev, "RX_PW_P1", nr) | |
| W_REGISTER(dev, "RX_PW_P2", nr) | |
| W_REGISTER(dev, "RX_PW_P3", nr) | |
| W_REGISTER(dev, "RX_PW_P4", nr) | |
| W_REGISTER(dev, "RX_PW_P5", nr) | |
| c, raw = R_RX_PAYLOAD(dev, nr) | |
| print(raw[1:], "/", raw[1:].hex(" ")) | |
| def txdata(dev, data): | |
| # PRIM_RX=0 | |
| c, raw = R_REGISTER(dev, "CONFIG") | |
| cfg = raw[1] & 0xFE | |
| W_REGISTER(dev, "CONFIG", cfg) | |
| W_TX_PAYLOAD(dev, data) | |
| def main(opt): | |
| args = opt.args | |
| name = opt.args[0] | |
| # generate bash completion | |
| if name.startswith("comp"): | |
| print(gencomp()) | |
| sys.exit(0) | |
| with NRF24(ss=opt.ss, ce=opt.ce).open() as dev: | |
| if name == "init": | |
| hwinit(dev, opt) | |
| elif name == "dump": | |
| print("# NOTE: Endian is LSByte-first on SPI") | |
| for i in REGS.keys(): | |
| rwreg(dev, i) | |
| elif name == "rx": | |
| rxdata(dev, *args[1:]) | |
| elif name == "tx": | |
| txdata(dev, *args[1:]) | |
| elif name == "rw": | |
| print("# NOTE: Endian is LSByte-first on SPI") | |
| rwreg(dev, *args[1:]) | |
| elif name == "ce": | |
| dev.ce(int(args[1])) | |
| elif name in REGS: | |
| print("# NOTE: Endian is LSByte-first on SPI") | |
| rwreg(dev, *args[0:]) | |
| elif name in CMDS: | |
| cmd = globals().get(name) | |
| c, raw = cmd(dev, *args[1:]) | |
| print(c, raw.hex(" "), "/", raw) | |
| if __name__ == '__main__': | |
| ap = ArgumentParser() | |
| ap.print_help = usage | |
| ap.add_argument('-D', '--debug', nargs='?', default='INFO') | |
| ap.add_argument('-s', '--ss', type=int, default=0) | |
| ap.add_argument('-c', '--ce', type=int, default=25) | |
| ap.add_argument('-m', '--mode', default='NONE') | |
| ap.add_argument('-r', '--rate', type=int, default=2000) | |
| ap.add_argument('-f', '--freq', type=int, default=2402) | |
| ap.add_argument('-p', '--power', type=int, default=0) | |
| ap.add_argument('args', nargs='*') | |
| opt = ap.parse_args() | |
| logging.basicConfig(level=eval('logging.' + opt.debug)) | |
| if len(opt.args) == 0: | |
| usage() | |
| main(opt) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment