Skip to content

Instantly share code, notes, and snippets.

@tai
Created August 27, 2021 09:29
Show Gist options
  • Select an option

  • Save tai/f25c0d0568f603cf5c8f00878006832b to your computer and use it in GitHub Desktop.

Select an option

Save tai/f25c0d0568f603cf5c8f00878006832b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import sys
import os
import re
import time
import pigpio
import logging
from collections import OrderedDict
from contextlib import contextmanager
from argparse import ArgumentParser
log = logging.getLogger(__name__)
def help():
p = os.path.basename(sys.argv[0])
return """
{p} - Control nRF24L01(P) over pigpio
Usage: {p} [opts] <cmd|reg> [args...]
Options:
-s <sel>: select SPI chip
-c <pin>: GPIO pin for CE
-m <mode>: Set either of (CW|COMPAT|SB|ESB|NONE)
-r <kbps>: Set datarate (250|1000|2000)
-f <MHz>: Set center freq
-p <pow>: Set power level (0:-18, 1:-12, 2:-6, 3:0dBm)
Example:
$ eval $({p} completion)
$ {p} dump
$ {p} ce 0
$ {p} ce 1
$ {p} STATUS 00
$ {p} R_REGISTER STATUS
$ {p} W_REGISTER STATUS 00
NOTE:
- Use PIGPIO_ADDR/PIGPIO_PORT to use remote pigpiod
""".format(**locals()).lstrip()
def usage():
sys.stderr.write(help())
sys.exit(0)
def gencomp():
"""Generate bash completion"""
p = os.path.basename(sys.argv[0])
s = """
_PROG_helper() {
local cur="${COMP_WORDS[COMP_CWORD]}";
if [ ${COMP_CWORD} -eq 1 ] ; then
COMPREPLY=($(compgen -W "dump rw ce init rx tx CMDS REGS" "$cur"));
elif [ ${COMP_CWORD} -eq 2 ] ; then
COMPREPLY=($(compgen -W "dump REGS" "$cur"));
fi
};
complete -F _PROG_helper CMD
"""
s = s.replace("REGS", " ".join(REGS.keys()))
s = s.replace("CMDS", " ".join(CMDS))
s = s.replace("PROG", p)
return s
######################################################################
# nRF24L01 PS - 9.1 Register map table
REGS_DEF = """
# NAME [ADDR [SIZE]]
CONFIG 0x00
EN_AA
EN_RXADDR
SETUP_AW
SETUP_RETR
RF_CH
RF_SETUP
STATUS
OBSERVE_TX
CD 0x09
RPD 0x09
RX_ADDR_P0 0x0A 5
RX_ADDR_P1 0x0B 5
RX_ADDR_P2
RX_ADDR_P3
RX_ADDR_P4
RX_ADDR_P5
TX_ADDR 0x10 5
RX_PW_P0
RX_PW_P1
RX_PW_P2
RX_PW_P3
RX_PW_P4
RX_PW_P5
FIFO_STATUS
DYNPD 0x1C
FEATURE 0x1D
"""
def genregs():
addr = 0
size = 1
regs = OrderedDict()
for i in REGS_DEF.splitlines():
if i.startswith("#"): continue
entry = i.split()
if len(entry) == 0:
continue
if len(entry) == 1:
name, addr, size = entry[0], addr, 1
elif len(entry) == 2:
name, addr, size = entry[0], int(entry[1], 16), 1
elif len(entry) == 3:
name, addr, size = entry[0], int(entry[1], 16), int(entry[2])
reg = lambda:0
reg.name = name
reg.addr = addr
reg.size = size
regs[name] = reg
addr += 1
return regs
REGS = genregs()
######################################################################
class NRF24(object):
def __init__(self, ss, ce):
self.ss = ss
self.ce_pin = ce
self.pi = pigpio.pi()
@contextmanager
def open(self):
try:
self.sh = self.pi.spi_open(self.ss, 1000000, 0)
yield self
finally:
self.pi.spi_close(self.sh)
def xfer(self, buf):
return self.pi.spi_xfer(self.sh, buf)
def ce(self, v):
self.pi.write(self.ce_pin, v)
def arg2buf(arg):
if type(arg) is bytes:
val = arg
elif type(arg) is list:
val = bytes(arg)
elif type(arg) is int:
nr = 1 if arg == 0 else int((arg.bit_length() + 7) / 8)
val = arg.to_bytes(nr, 'big')
elif arg.startswith('s:'): # handle ASCII string
val = bytes(arg[2:], 'utf-8')
elif arg.startswith('b:'): # handle MSByte-first syntax
val = bytearray.fromhex(arg[2:])
val.reverse()
elif arg.startswith('l:'): # handle LSByte-first syntax
val = bytes.fromhex(arg[2:])
else:
val = bytes.fromhex(arg)
return bytes(val)
######################################################################
_CMDS_START = dir()
def R_REGISTER(dev, regname):
reg = REGS.get(regname)
buf = [reg.addr] + [0] * reg.size
return dev.xfer(buf)
def W_REGISTER(dev, regname, data):
reg = REGS.get(regname)
val = arg2buf(data)
vallen = len(val)
if vallen != reg.size:
log.warning("data size ({vallen}) != reg.size ({reg.size}): {regname}".format(**locals()))
return None
buf = [0x20 | reg.addr] + list(val)
return dev.xfer(buf)
def R_RX_PAYLOAD(dev, nr):
buf = [0x61] + [0] * int(nr)
return dev.xfer(buf)
def W_TX_PAYLOAD(dev, data):
buf = [0xA0] + list(arg2buf(data))
return dev.xfer(buf)
def FLUSH_TX(dev):
return dev.xfer([0xE1])
def FLUSH_RX(dev):
return dev.xfer([0xE2])
def REUSE_TX_PL(dev):
return dev.xfer([0xE3])
def ACTIVATE(dev):
return dev.xfer([0x50, 0x73])
def R_RX_PL_WID(dev):
return dev.xfer([0x60, 0x00])
def W_ACK_PAYLOAD(dev, pipe, data):
buf = [0xA8 | (int(pipe) & 7)] + list(arg2buf(data))
return dev.xfer(buf)
def W_TX_PAYLOAD_NOACK(dev, data):
buf = [0xB0] + list(arg2buf(data))
return dev.xfer(buf)
def NOP(dev):
return dev.xfer([0xFF])
_CMDS_END = filter(lambda s: not s.startswith('_'), dir())
# list of SPI commands
CMDS = set(_CMDS_END) - set(_CMDS_START)
######################################################################
def rwreg(dev, regname, val=None):
if val:
W_REGISTER(dev, regname, val)
rwreg(dev, regname)
else:
c, raw = R_REGISTER(dev, regname)
print(
"{regname:>12s}".format(**locals()),
raw[1:].hex(" "), "/",
" ".join(["{:0>8b}".format(i) for i in raw[1:]]))
def hwinit(dev, opt):
# DS 8.3.1 SPI Commands
# W_REGISTER - Executable in power down or standby mode only
dev.ce(0)
# set freq
W_REGISTER(dev, "RF_CH", 0x7F & (opt.freq - 2400))
# NOTE:
# - 1Mbps is expected for COMPAT/SB mode
rate = 0x20 if opt.rate == 250 else 0x08 if opt.rate == 2000 else 0x00
power = (0x03 & opt.power) << 1
W_REGISTER(dev, "RF_SETUP", rate|power|0x01)
# DS 9.1 Register map table, footnote (a)
# set timeout long enough for any payload length at any rate
W_REGISTER(dev, "SETUP_RETR", 0xF3)
# DS Appendix C - Carrier wave output power
if opt.mode == 'CW':
W_REGISTER(dev, "CONFIG", 0x02) # PWR_UP=1, PRIM_RX=0, EN_CRC=0
time.sleep(0.001)
W_REGISTER(dev, "EN_AA", 0x00)
W_REGISTER(dev, "SETUP_RETR", 0x00)
W_REGISTER(dev, "RF_SETUP", rate|power|0x91) # CONT_WAVE=1, PLL_LOCK=1
W_REGISTER(dev, "TX_ADDR", [0xFF, 0xFF, 0xFF, 0xFF, 0xFF])
W_TX_PAYLOAD(dev, "FF" * 32)
dev.ce(1)
time.sleep(0.001)
REUSE_TX_PL(dev)
# DS Appendix B - Configuration for compatibility with nRF24XX
elif opt.mode == 'COMPAT':
#ACTIVATE(dev)
W_REGISTER(dev, "CONFIG", 0x0A)
W_REGISTER(dev, "EN_AA", 0x00)
W_REGISTER(dev, "FEATURE", 0x00)
W_REGISTER(dev, "DYNPD", 0x00)
W_REGISTER(dev, "SETUP_RETR", 0x00)
# DS 7.10 Compatibility with ShockBurst
elif opt.mode == 'SB':
#ACTIVATE(dev)
W_REGISTER(dev, "CONFIG", 0x0A)
W_REGISTER(dev, "EN_AA", 0x00)
W_REGISTER(dev, "FEATURE", 0x00)
W_REGISTER(dev, "DYNPD", 0x00)
W_REGISTER(dev, "SETUP_RETR", 0xF3)
# DS 7. Enchanced ShockBurst
# DS Appendix A - Configuration and Communication Example
elif opt.mode == 'ESB':
ACTIVATE(dev)
W_REGISTER(dev, "CONFIG", 0x0A)
W_REGISTER(dev, "EN_AA", 0x3F)
W_REGISTER(dev, "FEATURE", 0x07)
W_REGISTER(dev, "DYNPD", 0x3F)
W_REGISTER(dev, "SETUP_RETR", 0xF3)
# clear flags
FLUSH_TX(dev)
FLUSH_RX(dev)
W_REGISTER(dev, "STATUS", 0xFF)
# enable
dev.ce(1)
def rxdata(dev, nr=0):
# PRIM_RX=1
c, raw = R_REGISTER(dev, "CONFIG")
cfg = raw[1] | 0x01
W_REGISTER(dev, "CONFIG", cfg)
nr = int(nr)
if nr > 0:
# Needed by pre-ESB protocols (SB, COMPAT)
W_REGISTER(dev, "RX_PW_P0", nr)
W_REGISTER(dev, "RX_PW_P1", nr)
W_REGISTER(dev, "RX_PW_P2", nr)
W_REGISTER(dev, "RX_PW_P3", nr)
W_REGISTER(dev, "RX_PW_P4", nr)
W_REGISTER(dev, "RX_PW_P5", nr)
c, raw = R_RX_PAYLOAD(dev, nr)
print(raw[1:], "/", raw[1:].hex(" "))
def txdata(dev, data):
# PRIM_RX=0
c, raw = R_REGISTER(dev, "CONFIG")
cfg = raw[1] & 0xFE
W_REGISTER(dev, "CONFIG", cfg)
W_TX_PAYLOAD(dev, data)
def main(opt):
args = opt.args
name = opt.args[0]
# generate bash completion
if name.startswith("comp"):
print(gencomp())
sys.exit(0)
with NRF24(ss=opt.ss, ce=opt.ce).open() as dev:
if name == "init":
hwinit(dev, opt)
elif name == "dump":
print("# NOTE: Endian is LSByte-first on SPI")
for i in REGS.keys():
rwreg(dev, i)
elif name == "rx":
rxdata(dev, *args[1:])
elif name == "tx":
txdata(dev, *args[1:])
elif name == "rw":
print("# NOTE: Endian is LSByte-first on SPI")
rwreg(dev, *args[1:])
elif name == "ce":
dev.ce(int(args[1]))
elif name in REGS:
print("# NOTE: Endian is LSByte-first on SPI")
rwreg(dev, *args[0:])
elif name in CMDS:
cmd = globals().get(name)
c, raw = cmd(dev, *args[1:])
print(c, raw.hex(" "), "/", raw)
if __name__ == '__main__':
ap = ArgumentParser()
ap.print_help = usage
ap.add_argument('-D', '--debug', nargs='?', default='INFO')
ap.add_argument('-s', '--ss', type=int, default=0)
ap.add_argument('-c', '--ce', type=int, default=25)
ap.add_argument('-m', '--mode', default='NONE')
ap.add_argument('-r', '--rate', type=int, default=2000)
ap.add_argument('-f', '--freq', type=int, default=2402)
ap.add_argument('-p', '--power', type=int, default=0)
ap.add_argument('args', nargs='*')
opt = ap.parse_args()
logging.basicConfig(level=eval('logging.' + opt.debug))
if len(opt.args) == 0:
usage()
main(opt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment