Created
September 28, 2025 07:07
-
-
Save bjornfor/af993eff78606621101d57ec51790e08 to your computer and use it in GitHub Desktop.
Declarative zigbee groups management for zigbee2mqtt >= 2.0.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Declarative group membership configuration for zigbee2mqtt. | |
Needed since they removed support for configuring group membership in configuration.yaml | |
in version 2.0, see | |
https://github.com/Koenkk/zigbee2mqtt/discussions/24198 ("Zigbee2MQTT 2.0.0 breaking changes") | |
and | |
https://github.com/Koenkk/zigbee2mqtt/issues/27619 ("[Feature request]: Bring back declarative group config (in configuration.yaml)"). | |
TLDR: z2m >= 2.0 supports declaring groups and their options, but not group membership. | |
Beware of https://github.com/Koenkk/zigbee2mqtt/issues/28706 ("Device group membership not cleared upon re-pairing"). | |
Author: Bjørn Forsman | |
""" | |
# standard library | |
from pathlib import Path | |
from typing import Any, Optional | |
import argparse | |
import json | |
import logging | |
import os | |
import shutil | |
import sys | |
import time | |
# 3rd party | |
import paho.mqtt.client | |
APP_NAME = "z2m-groups" | |
LOG_LEVELS = { | |
"debug": logging.DEBUG, | |
"info": logging.INFO, | |
"warning": logging.WARNING, | |
"error": logging.ERROR, | |
} | |
logger = logging.getLogger(APP_NAME) | |
def get_password(password_file: Optional[Path]) -> str: | |
if password_file is not None: | |
return password_file.read_text().strip() | |
# Try password file from systemd LoadCredential= setting. | |
creds_dir = os.environ.get("CREDENTIALS_DIRECTORY") | |
if creds_dir is not None: | |
return (Path(creds_dir) / "password_file").read_text().strip() | |
else: | |
raise RuntimeError( | |
"Password file not given as argument, nor available at $CREDENTIALS_DIRECTORY/password_file (systemd service)" | |
) | |
def connect_to_server( | |
server: str, username: str, password_file: Optional[Path] | |
) -> paho.mqtt.client.Client: | |
password = get_password(password_file) | |
client = paho.mqtt.client.Client(paho.mqtt.client.CallbackAPIVersion.VERSION2) | |
client.username_pw_set(username, password) | |
logger.info(f"connecting to MQTT server '{server}' as user '{username}'") | |
client.connect(server) | |
# Start networking loop (runs in a separate thread). | |
client.loop_start() | |
logger.info("connected to server") | |
logger.info("waiting for z2m to be online") | |
for i in range(30): | |
res = get_retained_message(client, "zigbee2mqtt/bridge/state") | |
if res["state"] == "online": | |
logger.info("z2m is online") | |
return client | |
else: | |
time.sleep(0.1) | |
raise RuntimeError("timeout waiting for z2m to be online") | |
def get_retained_message(client: paho.mqtt.client.Client, topic: str) -> Any: | |
payload = None | |
def on_message(client, userdata, msg): | |
nonlocal payload | |
payload = json.loads(msg.payload.decode()) | |
logger.debug( | |
f"got message ({len(msg.payload)} bytes) for topic '{topic}': {payload}" | |
) | |
err, num = client.unsubscribe(topic) | |
if err != paho.mqtt.enums.MQTTErrorCode.MQTT_ERR_SUCCESS: | |
raise RuntimeError(f"MQTTErrorCode: {err}") | |
client.on_message = on_message | |
err, num = client.subscribe(topic) | |
if err != paho.mqtt.enums.MQTTErrorCode.MQTT_ERR_SUCCESS: | |
raise RuntimeError(f"MQTTErrorCode: {err}") | |
for i in range(30): | |
if payload is not None: | |
break | |
else: | |
time.sleep(0.1) | |
if payload is None: | |
raise RuntimeError(f"timeout waiting for message on topic '{topic}'") | |
return payload | |
def get_devices_and_groups_from_server( | |
client: paho.mqtt.client.Client, | |
) -> (list[dict], list[dict]): | |
raw_devices = get_retained_message(client, "zigbee2mqtt/bridge/devices") | |
raw_groups = get_retained_message(client, "zigbee2mqtt/bridge/groups") | |
return (raw_devices, raw_groups) | |
def get_device_group_info_from_raw_data( | |
raw_devices: list[dict], raw_groups: list[dict] | |
) -> dict: | |
devices = {} | |
ieee2friendly = {} | |
for device in raw_devices: | |
logger.debug(f"{device=}") | |
d = {k: v for k, v in device.items() if k in ["ieee_address", "friendly_name"]} | |
ieee2friendly[d["ieee_address"]] = d["friendly_name"] | |
devices[d["friendly_name"]] = { | |
"groups": {}, | |
} | |
for group in raw_groups: | |
logger.debug(f"{group=}") | |
for member in sorted(group["members"], key=lambda x: x["ieee_address"]): | |
logger.debug(f"{member=}") | |
try: | |
device_name = ieee2friendly[member["ieee_address"]] | |
except KeyError: | |
logger.warning( | |
f"group {group['id']} ('{group['friendly_name']}') has un-removable ghost member {member['ieee_address']} -- see https://github.com/Koenkk/zigbee2mqtt/issues/28706 (\"Device group membership not cleared upon re-pairing\")" | |
) | |
# Related to the above issue, z2m doesn't support removing | |
# ghost devices from groups; it says the device doesn't exist. | |
# So the only thing we can do is skip that device (for now). | |
continue | |
devices[device_name]["groups"][group["id"]] = group["friendly_name"] | |
return devices | |
def get_device_group_info_from_config(groups: dict) -> dict: | |
"""Return the target/desired group state, based on the input config (groups).""" | |
devices = {} | |
for k in sorted(groups, key=lambda x: int(x)): | |
v = groups[k] | |
group_id = int(k) | |
logger.debug(f" {k=} {v=}") | |
for dev in sorted(v["devices"]): | |
logger.debug(f"{dev=}") | |
if dev not in devices: | |
devices[dev] = {} | |
devices[dev]["groups"] = {} | |
devices[dev]["groups"][group_id] = v["friendly_name"] | |
return devices | |
def mqtt_publish( | |
client: paho.mqtt.client.Client, topic: str, payload: dict, dry_run: bool | |
) -> None: | |
msg = f"publish @ {topic}: {payload}" | |
if dry_run: | |
logger.info("DRY_RUN: " + msg) | |
else: | |
logger.info(msg) | |
res = client.publish(topic, json.dumps(payload)) | |
res.wait_for_publish(timeout=5) | |
if res.rc != paho.mqtt.enums.MQTTErrorCode.MQTT_ERR_SUCCESS: | |
raise RuntimeError(f"publish error: {res.rc}") | |
def create_update_plan( | |
raw_devices: list[dict], | |
raw_groups: list[dict], | |
groups_config: dict, | |
ignore_server_groups: bool, | |
) -> list[dict]: | |
"""Return list of MQTT publish dicts, with 'topic' and 'payload' keys.""" | |
current_state = get_device_group_info_from_raw_data(raw_devices, raw_groups) | |
for k in sorted(current_state): | |
v = current_state[k] | |
logger.info(f"current state: {k}: {v}") | |
target_state = get_device_group_info_from_config(groups_config) | |
# Set devices not listed in target_state to an empty group set, to make | |
# current_state and target_state more easily diffable. | |
for k in sorted(current_state): | |
v = current_state[k] | |
if k not in target_state: | |
target_state[k] = {"groups": {}} | |
for k in sorted(target_state): | |
v = target_state[k] | |
logger.info(f"target state: {k}: {v}") | |
plan = [] | |
for dev_name in target_state: | |
if dev_name == "Coordinator": | |
# Skip the special Coordinator device, since it isn't a user device | |
# and isn't shown in the web UI. It'd be weird to try managing it | |
# as any other device then. Skipping it also ensures that this | |
# program reports the same number of devices as the web UI. | |
continue | |
if ignore_server_groups: | |
# remove from all groups | |
topic = "zigbee2mqtt/bridge/request/group/members/remove_all" | |
payload = { | |
"device": dev_name, | |
} | |
plan.append({"topic": topic, "payload": payload}) | |
# add loop | |
for group_id, group_name in target_state[dev_name]["groups"].items(): | |
topic = "zigbee2mqtt/bridge/request/group/members/add" | |
payload = { | |
"group": group_id, | |
"device": dev_name, | |
} | |
plan.append({"topic": topic, "payload": payload}) | |
elif current_state[dev_name] != target_state[dev_name]: | |
logger.debug( | |
f"{dev_name=} {current_state[dev_name]=} {target_state[dev_name]=}" | |
) | |
# remove loop | |
for group_id, group_name in current_state[dev_name]["groups"].items(): | |
if group_id not in target_state[dev_name]["groups"]: | |
topic = "zigbee2mqtt/bridge/request/group/members/remove" | |
payload = { | |
"group": group_id, | |
"device": dev_name, | |
} | |
plan.append({"topic": topic, "payload": payload}) | |
# add loop | |
for group_id, group_name in target_state[dev_name]["groups"].items(): | |
if group_id not in current_state[dev_name]["groups"]: | |
topic = "zigbee2mqtt/bridge/request/group/members/add" | |
payload = { | |
"group": group_id, | |
"device": dev_name, | |
} | |
plan.append({"topic": topic, "payload": payload}) | |
return plan | |
def update_groups( | |
server: str, | |
username: str, | |
password_file: Optional[Path], | |
groups_file: Path, | |
ignore_server_groups: bool, | |
dry_run: bool, | |
) -> None: | |
with open(groups_file, "r") as fobj: | |
groups_config = json.load(fobj) | |
client = connect_to_server(server, username, password_file) | |
raw_devices, raw_groups = get_devices_and_groups_from_server(client) | |
plan = create_update_plan( | |
raw_devices, raw_groups, groups_config, ignore_server_groups | |
) | |
logger.info("applying changes") | |
changed_devices = set() | |
for args in plan: | |
mqtt_publish(client, args["topic"], args["payload"], dry_run) | |
changed_devices.add(args["payload"]["device"]) | |
logger.info(f"num changed devices: {len(changed_devices)}") | |
client.disconnect() | |
def get_args() -> argparse.Namespace: | |
"""Return CLI args as Namespace object.""" | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument( | |
"--server", | |
required=True, | |
type=str, | |
help="hostname/IP address of the MQTT broker/server to connect to", | |
) | |
parser.add_argument( | |
"--username", | |
required=True, | |
type=str, | |
help="user name for the MQTT server", | |
) | |
parser.add_argument( | |
"--password-file", | |
metavar="PATH", | |
type=Path, | |
help="file containing user password; will use $CREDENTIALS_DIRECTORY/password_file (systemd service) if not set", | |
) | |
parser.add_argument( | |
"--groups-file", | |
metavar="PATH", | |
required=True, | |
type=Path, | |
help='path to a JSON file containing the \'groups\' object from z2m 1.x configuration.yaml; e.g. {"1": {"devices": ["bulb1", "bulb2"], "friendly_name": "mygroup1"}, "2": ...}', | |
) | |
parser.add_argument( | |
"--ignore-server-groups", | |
action="store_true", | |
help="ignore the current group info from the server; assume unknown group membership", | |
) | |
parser.add_argument( | |
"--log-level", | |
choices=LOG_LEVELS.keys(), | |
default="info", | |
help="log level; default is 'info'", | |
) | |
parser.add_argument( | |
"--dry-run", | |
action="store_true", | |
help="do everything but *writing* to the MQTT server", | |
) | |
return parser.parse_args() | |
def main(): | |
args = get_args() | |
logging.basicConfig( | |
format="%(levelname)s: %(message)s", level=LOG_LEVELS[args.log_level] | |
) | |
update_groups( | |
args.server, | |
args.username, | |
args.password_file, | |
args.groups_file, | |
args.ignore_server_groups, | |
args.dry_run, | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment