Skip to content

Instantly share code, notes, and snippets.

@Staars
Last active January 8, 2023 17:05
Show Gist options
  • Save Staars/003c790e4b1cba9eeaf734c421131666 to your computer and use it in GitHub Desktop.
Save Staars/003c790e4b1cba9eeaf734c421131666 to your computer and use it in GitHub Desktop.
LD2410 BLE driver
#-
LF2410.be - HLK-LD2410 24GHz smart wave motion sensor support for Tasmota via BLE and Berry
port of xsns_102_ld2410.ino from Theo Arends
SPDX-FileCopyrightText: 2022 Christian Baars
SPDX-License-Identifier: GPL-3.0-only
-#
class LD2410 : Driver
var buf
var current_func, next_func
static MAX_GATES = 8 # 0 to 8 (= 9) - DO NOT CHANGE
static CMND_START_CONFIGURATION = 0xFF
static CMND_END_CONFIGURATION = 0xFE
static CMND_SET_DISTANCE = 0x60
static CMND_READ_PARAMETERS = 0x61
static CMND_START_ENGINEERING = 0x62
static CMND_END_ENGINEERING = 0x63
static CMND_SET_SENSITIVITY = 0x64
static CMND_GET_FIRMWARE = 0xA0
static CMND_SET_BAUDRATE = 0xA1
static CMND_FACTORY_RESET = 0xA2
static CMND_REBOOT = 0xA3
static CMND_BLE_NOTIFY = 0xA8
static config_header = bytes("FDFCFBFA")
static config_footer = bytes("04030201")
static target_header = bytes("F4F3F2F1")
static target_footer = bytes("F8F7F6F5")
var moving_distance
var moving_energy
var static_distance
var static_energy
var detect_distance
var moving_sensitivity
var static_sensitivity
var sensitivity_counter
def sendBLE(packet)
import BLE
self.buf.setbytes(1,packet)
self.buf[0] = size(packet)
BLE.set_chr("FFF2")
BLE.run(2,false)
print("LD2: send",packet)
self.then(/->self.wait()) #fallback, if calling function does not "reset" this
end
def sendCMD(cmd,val)
var val_len = 0
if val != nil
val_len = size(val)
end
var sz = 4 + 2 + 2 + val_len + 4 # header + len + cmd + val + footer
var cmd_buf = bytes(-sz)
cmd_buf.setbytes(0,self.config_header)
cmd_buf[4] = 2 + val_len
cmd_buf[6] = cmd
if val != nil
cmd_buf.setbytes(8,val)
end
cmd_buf.setbytes(8+val_len,self.config_footer)
self.sendBLE(cmd_buf)
end
def init(MAC)
import BLE
self.buf = bytes(-48)
var cbp = tasmota.gen_cb(/e,o,u->self.cb(e,o,u)) # create a callback function pointer
self.moving_sensitivity = bytes(-(self.MAX_GATES + 1))
self.static_sensitivity = bytes(-(self.MAX_GATES + 1))
BLE.conn_cb(cbp,self.buf)
BLE.set_MAC(bytes(MAC),0) # addrType: 0
BLE.set_svc("FFF0")
BLE.set_chr("FFF1")
BLE.run(3,true)
self.then(/->self.initNoti())
end
def initNoti()
self.sendCMD(self.CMND_BLE_NOTIFY,bytes("48694c696e6b")) #48694c696e6b = HiLink in HEX
print("LD2: Init notifications")
self.then(/->self.getFW())
end
def getFW()
self.sendCMD(self.CMND_GET_FIRMWARE)
print("LD2: Request FW")
self.then(/->self.wait())
end
def factoryReset()
self.sendCMD(self.CMND_FACTORY_RESET)
print("LD2: Factory reset")
self.then(/->self.setCfgMode(false))
end
def setCfgMode(on)
if on == true
self.sendCMD(self.CMND_START_CONFIGURATION,bytes("0100"))
print("LD2: Start config mode")
else
self.sendCMD(self.CMND_END_CONFIGURATION)
print("LD2: Stop config mode")
end
end
def setEngMode(on)
if on == true
self.sendCMD(self.CMND_START_ENGINEERING,bytes("0100"))
print("LD2: Start engineering mode")
else
self.sendCMD(self.CMND_END_ENGINEERING)
print("LD2: Stop engineering mode")
end
self.then(/->self.setCfgMode(false))
end
def setMaxDistAndNoOneDur(max_mov_dist_range, max_stat_dist_range, no_one_duration)
var val = bytes(-18)
val[2] = max_mov_dist_range
val[6] = 1
val[8] = max_stat_dist_range
val[12] = 2
val.set(14,no_one_duration,-2) #big-endian
self.sendCMD(self.CMND_SET_DISTANCE,val)
self.then(/->self.setCfgMode(false))
end
def settGateSensitivity(gate, moving_sensitivity, static_sensitivity)
var val = bytes(-18)
val[2] = gate
val[6] = 1
val[8] = moving_sensitivity
val[12] = 2
val[14] = static_sensitivity
self.sendCMD(self.CMND_SET_SENSITIVITY,val)
end
def settAllSensitivity(sensitivity)
var val = bytes(-18)
val[2] = 0xff
val[3] = 0xff
val[6] = 1
val[8] = sensitivity
val[12] = 2
val[14] = sensitivity
self.sendCMD(self.CMND_SET_SENSITIVITY,val)
end
# our little promise implementation
def wait()
# do nothing
end
def then(func)
# save function pointers for callback, typically expecting a closure
self.next_func = func
self.current_func = self.wait
end
def every_second()
self.current_func()
if self.moving_distance != nil
# very unfinished
print(self.buf)
end
end
def handleCFG()
# BLE buffer shifted one byte to the right
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# len FD FC FB FA 1C 00 61 01 00 00 AA 08 08 08 32 32 28 1E 14 0F 0F 0F 0F 00 00 28 28 1E 1E 14 14 14 05 00 04 03 02 01 - Default
if self.buf[7] == self.CMND_READ_PARAMETERS
print("LD2: moving_distance_gate",self.buf[13],"static_distance_gate",self.buf[14],"no one duration",self.buf.get(33,-2))
for i:0..self.MAX_GATES
print("LD2: moving sens",i, self.buf(15+i), "static sens",i, self.buf(24+i))
end
end
if self.buf[7] == self.CMND_START_CONFIGURATION
self.current_func = self.next_func # could be a "promise" waiting
print("LD2: Sensor ACK")
end
if self.buf[7] == self.CMND_GET_FIRMWARE
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# len FD FC FB FA 0C 00 A0 01 00 00 00 01 07 01 16 15 09 22 04 03 02 01
# header |len |ty|hd|ack |ftype|major|minor |trailer
# | 12| | 1| 0| 256| 1.7| 22091516|
import string
var s = string.format("LD2: Firmware version V%d.%02d.%02d%02d%02d%02d",self.buf[14],self.buf[13],self.buf[18],self.buf[17],self.buf[16],self.buf[15])
print(s)
end
end
def handleTRG()
# BLE buffer shifted one byte to the right
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
# 17 F4 F3 F2 F1 0D 00 02 AA 00 00 00 00 00 00 37 00 00 55 00 F8 F7 F6 F5 - No target
# 17 F4 F3 F2 F1 0D 00 02 AA 03 46 00 34 00 00 3C 00 00 55 00 F8 F7 F6 F5 - Movement and Stationary target
# 17 F4 F3 F2 F1 0D 00 02 AA 02 54 00 00 00 00 64 00 00 55 00 F8 F7 F6 F5 - Stationary target
#len header |len |dt|hd|st|movin|me|stati|se|detec|tr|ck|trailer
if self.buf[9] != 0
if self.buf[7] == 2
self.moving_distance = self.buf.get(10,2)
self.moving_energy = self.buf[12];
self.static_distance = self.buf.get(13,2)
self.static_energy = self.buf[15];
self.detect_distance = self.buf.get(16,2)
else
print("LD2: engineering mode")
end
end
end
def handle_noti()
if self.buf[1] == 0xfd
self.handleCFG()
end
if self.buf[1] == 0xf4
self.handleTRG()
end
end
def cb(error,op,uuid)
if error == 0
if op == 103
self.handle_noti()
return
end
self.current_func = self.next_func
print("BLE: Op:",op)
return
end
print("BLE: Error:",error)
end
def serviceSensitivities()
if self.sensitivity_counter < 0
self.setCfgMode(false)
print("LD2: Did set sensitivities")
self.then(/->self.setCfgMode(false))
else
self.settGateSensitivity(self.sensitivity_counter,self.moving_sensitivity[self.sensitivity_counter],self.static_sensitivity[self.sensitivity_counter])
self.sensitivity_counter -= 1
self.then(/->self.serviceSensitivities())
print("LD2:",self.sensitivity_counter + 1,"steps left")
end
end
#- console commands -#
def cmndDuration(cmd, idx, payload, payload_json)
var pl = int(payload)
self.setCfgMode(true)
if pl == 0
self.then(/->self.factoryReset())
else
self.then(/->self.setMaxDistAndNoOneDur(8, 8, pl))
end
tasmota.resp_cmnd({"Duration":pl})
end
def cmndMovingSens(cmd, idx, payload, payload_json)
import string
var pl = string.split(payload,",")
var i = 0
for s:pl
self.moving_sensitivity[i] = int(s) #TODO error check
i += 1
end
self.sensitivity_counter = self.MAX_GATES
self.setCfgMode(true)
self.then(/->self.serviceSensitivities())
tasmota.resp_cmnd({"MovingSens":pl})
end
def cmndStaticSens(cmd, idx, payload, payload_json)
import string
var pl = string.split(payload,",")
var i = 0
for s:pl
self.static_sensitivity[i] = int(s) #TODO error check
i += 1
end
self.sensitivity_counter = self.MAX_GATES
self.setCfgMode(true)
self.then(/->self.serviceSensitivities())
tasmota.resp_cmnd({"StaticSens":pl})
end
def cmndEngMode(cmd, idx, payload, payload_json)
var pl = int(payload) > 0
self.setCfgMode(true)
self.then(/->self.setEngMode(pl))
tasmota.resp_cmnd({"Engineering mode":payload})
end
#- display sensor value in the web UI -#
def web_sensor()
if self.moving_distance == nil return nil end
import string
var msg = string.format(
"{s}LD2410 moving distance{m}%u cm{e}"..
"{s}LD2410 static distance{m}%u cm{e}"..
"{s}LD2410 detect distance{m}%u cm{e}",
self.moving_distance,self.static_distance, self.detect_distance)
tasmota.web_send_decimal(msg)
end
#- add sensor value to teleperiod -#
def json_append()
if self.moving_distance == nil return nil end
import string
var msg = string.format(",\"LD2410\":{\"distance\":[%i,%i,%i],\"energy\":[%i,%i]}",
self.moving_distance, self.static_distance, self.detect_distance, self.moving_energy,self.static_energy)
tasmota.response_append(msg)
end
end
var ld2410 = LD2410("C145FFF43769") # MAC of the device
tasmota.add_driver(ld2410)
tasmota.add_cmd('LD2410Duration',/c,i,p,j->ld2410.cmndDuration(c,i,p,j))
tasmota.add_cmd('LD2410MovingSens',/c,i,p,j->ld2410.cmndMovingSens(c,i,p,j))
tasmota.add_cmd('LD2410StaticSens',/c,i,p,j->ld2410.cmndStaticSens(c,i,p,j))
tasmota.add_cmd('LD2410EngMode',/c,i,p,j->ld2410.cmndEngMode(c,i,p,j))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment