Created
August 25, 2021 16:29
-
-
Save tai/f9bb25a0d8d283410251b129887db001 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import sys | |
| import os | |
| import re | |
| import pigpio | |
| from collections import OrderedDict | |
| from contextlib import contextmanager | |
| def help(): | |
| p = os.path.basename(sys.argv[0]) | |
| return """ | |
| {p} - Control nRF24L01 over pigpio | |
| Usage: {p} <cmd|reg> [args...] | |
| Example: | |
| $ eval $({p} completion) | |
| $ {p} dump | |
| $ {p} STATUS | |
| $ {p} STATUS 00 | |
| $ {p} R_REGISTER STATUS | |
| $ {p} W_REGISTER STATUS 00 | |
| NOTE: | |
| - Use PIGPIO_ADDR/PIGPIO_PORT to use remote pigpiod | |
| """.format(**locals()).lstrip() | |
| def usage(): | |
| sys.stderr.write(help()) | |
| sys.exit(0) | |
| def gencomp(): | |
| """Generate bash completion""" | |
| p = os.path.basename(sys.argv[0]) | |
| s = """ | |
| _PROG_helper() { | |
| local cur="${COMP_WORDS[COMP_CWORD]}"; | |
| if [ ${COMP_CWORD} -eq 1 ] ; then | |
| COMPREPLY=($(compgen -W "dump CMDS REGS" "$cur")); | |
| elif [ ${COMP_CWORD} -eq 2 ] ; then | |
| COMPREPLY=($(compgen -W "dump REGS" "$cur")); | |
| fi | |
| }; | |
| complete -F _PROG_helper CMD | |
| """ | |
| s = s.replace("REGS", " ".join(REGS.keys())) | |
| s = s.replace("CMDS", " ".join(CMDS)) | |
| s = s.replace("PROG", p) | |
| return s | |
| ###################################################################### | |
| # nRF24L01 PS - 9.1 Register map table | |
| REGS_DEF = """ | |
| # NAME [ADDR [SIZE]] | |
| CONFIG 0x00 | |
| EN_AA | |
| EN_RXADDR | |
| SETUP_AW | |
| SETUP_RETR | |
| RF_CH | |
| RF_SETUP | |
| STATUS | |
| OBSERVE_TX | |
| CD | |
| RX_ADDR_P0 0x0A 5 | |
| RX_ADDR_P1 0x0B 5 | |
| RX_ADDR_P2 | |
| RX_ADDR_P3 | |
| RX_ADDR_P4 | |
| RX_ADDR_P5 | |
| TX_ADDR 0x10 5 | |
| RX_PW_P0 | |
| RX_PW_P1 | |
| RX_PW_P2 | |
| RX_PW_P3 | |
| RX_PW_P4 | |
| RX_PW_P5 | |
| FIFO_STATUS | |
| DYNPD 0x1C | |
| FEATURE 0x1D | |
| """ | |
| def genregs(): | |
| addr = 0 | |
| size = 1 | |
| regs = OrderedDict() | |
| for i in REGS_DEF.splitlines(): | |
| if i.startswith("#"): continue | |
| entry = i.split() | |
| if len(entry) == 0: | |
| continue | |
| if len(entry) == 1: | |
| name, addr, size = entry[0], addr, 1 | |
| elif len(entry) == 2: | |
| name, addr, size = entry[0], int(entry[1], 16), 1 | |
| elif len(entry) == 3: | |
| name, addr, size = entry[0], int(entry[1], 16), int(entry[2]) | |
| reg = lambda:0 | |
| reg.name = name | |
| reg.addr = addr | |
| reg.size = size | |
| regs[name] = reg | |
| addr += 1 | |
| return regs | |
| REGS = genregs() | |
| ###################################################################### | |
| class SPI(object): | |
| @contextmanager | |
| def open(self): | |
| try: | |
| self.pi = pigpio.pi() | |
| self.sh = self.pi.spi_open(0, 1000000, 0) | |
| yield self | |
| finally: | |
| self.pi.spi_close(self.sh) | |
| def xfer(self, buf): | |
| return self.pi.spi_xfer(self.sh, buf) | |
| ###################################################################### | |
| _CMDS_START = dir() | |
| def R_REGISTER(spi, regname): | |
| reg = REGS.get(regname) | |
| buf = [reg.addr] + [0] * reg.size | |
| return spi.xfer(buf) | |
| def W_REGISTER(spi, regname, hexstr): | |
| reg = REGS.get(regname) | |
| val = bytes.fromhex(hexstr) | |
| if len(val) != reg.size: | |
| return -1 | |
| buf = [0x20 | reg.addr] + list(val) | |
| return spi.xfer(buf) | |
| def R_RX_PAYLOAD(spi, nr): | |
| buf = [0x61] + [0] * nr | |
| return spi.xfer(buf) | |
| def W_TX_PAYLOAD(spi, hexstr): | |
| buf = [0xA0] + list(bytes.fromhex(hexstr)) | |
| return spi.xfer(buf) | |
| def FLUSH_TX(spi): | |
| return spi.xfer([0xE1]) | |
| def FLUSH_RX(spi): | |
| return spi.xfer([0xE2]) | |
| def REUSE_TX_PL(spi): | |
| return spi.xfer([0xE3]) | |
| def ACTIVATE(spi): | |
| return spi.xfer([0x50, 0x73]) | |
| def R_RX_PL_WID(spi): | |
| return spi.xfer([0x60]) | |
| def W_ACK_PAYLOAD(spi, pipe, hexstr): | |
| buf = [0xA8 | pipe] + list(bytes.fromhex(hexstr)) | |
| return spi.xfer(buf) | |
| def W_TX_PAYLOAD_NOACK(spi, hexstr): | |
| buf = [0xB0] + list(bytes.fromhex(hexstr)) | |
| return spi.xfer(buf) | |
| def NOP(spi): | |
| return spi.xfer([0xFF]) | |
| _CMDS_END = filter(lambda s: not s.startswith('_'), dir()) | |
| # list of SPI commands | |
| CMDS = set(_CMDS_END) - set(_CMDS_START) | |
| ###################################################################### | |
| def rwreg(spi, regname, val=None): | |
| if val: | |
| W_REGISTER(spi, regname, val) | |
| rwreg(spi, regname) | |
| else: | |
| c, raw = R_REGISTER(spi, regname) | |
| print( | |
| "{regname:>12s} {c}".format(**locals()), | |
| raw.hex(" "), "/", | |
| " ".join(["{:0>8b}".format(i) for i in raw])) | |
| def main(args): | |
| name = args[1] | |
| # generate bash completion | |
| if name.startswith("comp"): | |
| print(gencomp()) | |
| sys.exit(0) | |
| with SPI().open() as spi: | |
| if name == "dump": | |
| for i in REGS.keys(): | |
| rwreg(spi, i) | |
| elif name == "rw": | |
| rwreg(spi, *args[2:]) | |
| elif name in REGS: | |
| rwreg(spi, *args[1:]) | |
| elif name in CMDS: | |
| cmd = globals().get(name) | |
| ret = cmd(spi, *args[2:]) | |
| print(ret) | |
| if __name__ == '__main__': | |
| if len(sys.argv) == 1: | |
| usage() | |
| main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment