Skip to content

Instantly share code, notes, and snippets.

@bjornfor
Created September 28, 2025 07:07
Show Gist options
  • Save bjornfor/af993eff78606621101d57ec51790e08 to your computer and use it in GitHub Desktop.
Save bjornfor/af993eff78606621101d57ec51790e08 to your computer and use it in GitHub Desktop.
Declarative zigbee groups management for zigbee2mqtt >= 2.0.0
#!/usr/bin/env python3
"""
Declarative group membership configuration for zigbee2mqtt.
Needed since they removed support for configuring group membership in configuration.yaml
in version 2.0, see
https://github.com/Koenkk/zigbee2mqtt/discussions/24198 ("Zigbee2MQTT 2.0.0 breaking changes")
and
https://github.com/Koenkk/zigbee2mqtt/issues/27619 ("[Feature request]: Bring back declarative group config (in configuration.yaml)").
TLDR: z2m >= 2.0 supports declaring groups and their options, but not group membership.
Beware of https://github.com/Koenkk/zigbee2mqtt/issues/28706 ("Device group membership not cleared upon re-pairing").
Author: Bjørn Forsman
"""
# standard library
from pathlib import Path
from typing import Any, Optional
import argparse
import json
import logging
import os
import shutil
import sys
import time
# 3rd party
import paho.mqtt.client
APP_NAME = "z2m-groups"
LOG_LEVELS = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warning": logging.WARNING,
"error": logging.ERROR,
}
logger = logging.getLogger(APP_NAME)
def get_password(password_file: Optional[Path]) -> str:
if password_file is not None:
return password_file.read_text().strip()
# Try password file from systemd LoadCredential= setting.
creds_dir = os.environ.get("CREDENTIALS_DIRECTORY")
if creds_dir is not None:
return (Path(creds_dir) / "password_file").read_text().strip()
else:
raise RuntimeError(
"Password file not given as argument, nor available at $CREDENTIALS_DIRECTORY/password_file (systemd service)"
)
def connect_to_server(
server: str, username: str, password_file: Optional[Path]
) -> paho.mqtt.client.Client:
password = get_password(password_file)
client = paho.mqtt.client.Client(paho.mqtt.client.CallbackAPIVersion.VERSION2)
client.username_pw_set(username, password)
logger.info(f"connecting to MQTT server '{server}' as user '{username}'")
client.connect(server)
# Start networking loop (runs in a separate thread).
client.loop_start()
logger.info("connected to server")
logger.info("waiting for z2m to be online")
for i in range(30):
res = get_retained_message(client, "zigbee2mqtt/bridge/state")
if res["state"] == "online":
logger.info("z2m is online")
return client
else:
time.sleep(0.1)
raise RuntimeError("timeout waiting for z2m to be online")
def get_retained_message(client: paho.mqtt.client.Client, topic: str) -> Any:
payload = None
def on_message(client, userdata, msg):
nonlocal payload
payload = json.loads(msg.payload.decode())
logger.debug(
f"got message ({len(msg.payload)} bytes) for topic '{topic}': {payload}"
)
err, num = client.unsubscribe(topic)
if err != paho.mqtt.enums.MQTTErrorCode.MQTT_ERR_SUCCESS:
raise RuntimeError(f"MQTTErrorCode: {err}")
client.on_message = on_message
err, num = client.subscribe(topic)
if err != paho.mqtt.enums.MQTTErrorCode.MQTT_ERR_SUCCESS:
raise RuntimeError(f"MQTTErrorCode: {err}")
for i in range(30):
if payload is not None:
break
else:
time.sleep(0.1)
if payload is None:
raise RuntimeError(f"timeout waiting for message on topic '{topic}'")
return payload
def get_devices_and_groups_from_server(
client: paho.mqtt.client.Client,
) -> (list[dict], list[dict]):
raw_devices = get_retained_message(client, "zigbee2mqtt/bridge/devices")
raw_groups = get_retained_message(client, "zigbee2mqtt/bridge/groups")
return (raw_devices, raw_groups)
def get_device_group_info_from_raw_data(
raw_devices: list[dict], raw_groups: list[dict]
) -> dict:
devices = {}
ieee2friendly = {}
for device in raw_devices:
logger.debug(f"{device=}")
d = {k: v for k, v in device.items() if k in ["ieee_address", "friendly_name"]}
ieee2friendly[d["ieee_address"]] = d["friendly_name"]
devices[d["friendly_name"]] = {
"groups": {},
}
for group in raw_groups:
logger.debug(f"{group=}")
for member in sorted(group["members"], key=lambda x: x["ieee_address"]):
logger.debug(f"{member=}")
try:
device_name = ieee2friendly[member["ieee_address"]]
except KeyError:
logger.warning(
f"group {group['id']} ('{group['friendly_name']}') has un-removable ghost member {member['ieee_address']} -- see https://github.com/Koenkk/zigbee2mqtt/issues/28706 (\"Device group membership not cleared upon re-pairing\")"
)
# Related to the above issue, z2m doesn't support removing
# ghost devices from groups; it says the device doesn't exist.
# So the only thing we can do is skip that device (for now).
continue
devices[device_name]["groups"][group["id"]] = group["friendly_name"]
return devices
def get_device_group_info_from_config(groups: dict) -> dict:
"""Return the target/desired group state, based on the input config (groups)."""
devices = {}
for k in sorted(groups, key=lambda x: int(x)):
v = groups[k]
group_id = int(k)
logger.debug(f" {k=} {v=}")
for dev in sorted(v["devices"]):
logger.debug(f"{dev=}")
if dev not in devices:
devices[dev] = {}
devices[dev]["groups"] = {}
devices[dev]["groups"][group_id] = v["friendly_name"]
return devices
def mqtt_publish(
client: paho.mqtt.client.Client, topic: str, payload: dict, dry_run: bool
) -> None:
msg = f"publish @ {topic}: {payload}"
if dry_run:
logger.info("DRY_RUN: " + msg)
else:
logger.info(msg)
res = client.publish(topic, json.dumps(payload))
res.wait_for_publish(timeout=5)
if res.rc != paho.mqtt.enums.MQTTErrorCode.MQTT_ERR_SUCCESS:
raise RuntimeError(f"publish error: {res.rc}")
def create_update_plan(
raw_devices: list[dict],
raw_groups: list[dict],
groups_config: dict,
ignore_server_groups: bool,
) -> list[dict]:
"""Return list of MQTT publish dicts, with 'topic' and 'payload' keys."""
current_state = get_device_group_info_from_raw_data(raw_devices, raw_groups)
for k in sorted(current_state):
v = current_state[k]
logger.info(f"current state: {k}: {v}")
target_state = get_device_group_info_from_config(groups_config)
# Set devices not listed in target_state to an empty group set, to make
# current_state and target_state more easily diffable.
for k in sorted(current_state):
v = current_state[k]
if k not in target_state:
target_state[k] = {"groups": {}}
for k in sorted(target_state):
v = target_state[k]
logger.info(f"target state: {k}: {v}")
plan = []
for dev_name in target_state:
if dev_name == "Coordinator":
# Skip the special Coordinator device, since it isn't a user device
# and isn't shown in the web UI. It'd be weird to try managing it
# as any other device then. Skipping it also ensures that this
# program reports the same number of devices as the web UI.
continue
if ignore_server_groups:
# remove from all groups
topic = "zigbee2mqtt/bridge/request/group/members/remove_all"
payload = {
"device": dev_name,
}
plan.append({"topic": topic, "payload": payload})
# add loop
for group_id, group_name in target_state[dev_name]["groups"].items():
topic = "zigbee2mqtt/bridge/request/group/members/add"
payload = {
"group": group_id,
"device": dev_name,
}
plan.append({"topic": topic, "payload": payload})
elif current_state[dev_name] != target_state[dev_name]:
logger.debug(
f"{dev_name=} {current_state[dev_name]=} {target_state[dev_name]=}"
)
# remove loop
for group_id, group_name in current_state[dev_name]["groups"].items():
if group_id not in target_state[dev_name]["groups"]:
topic = "zigbee2mqtt/bridge/request/group/members/remove"
payload = {
"group": group_id,
"device": dev_name,
}
plan.append({"topic": topic, "payload": payload})
# add loop
for group_id, group_name in target_state[dev_name]["groups"].items():
if group_id not in current_state[dev_name]["groups"]:
topic = "zigbee2mqtt/bridge/request/group/members/add"
payload = {
"group": group_id,
"device": dev_name,
}
plan.append({"topic": topic, "payload": payload})
return plan
def update_groups(
server: str,
username: str,
password_file: Optional[Path],
groups_file: Path,
ignore_server_groups: bool,
dry_run: bool,
) -> None:
with open(groups_file, "r") as fobj:
groups_config = json.load(fobj)
client = connect_to_server(server, username, password_file)
raw_devices, raw_groups = get_devices_and_groups_from_server(client)
plan = create_update_plan(
raw_devices, raw_groups, groups_config, ignore_server_groups
)
logger.info("applying changes")
changed_devices = set()
for args in plan:
mqtt_publish(client, args["topic"], args["payload"], dry_run)
changed_devices.add(args["payload"]["device"])
logger.info(f"num changed devices: {len(changed_devices)}")
client.disconnect()
def get_args() -> argparse.Namespace:
"""Return CLI args as Namespace object."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--server",
required=True,
type=str,
help="hostname/IP address of the MQTT broker/server to connect to",
)
parser.add_argument(
"--username",
required=True,
type=str,
help="user name for the MQTT server",
)
parser.add_argument(
"--password-file",
metavar="PATH",
type=Path,
help="file containing user password; will use $CREDENTIALS_DIRECTORY/password_file (systemd service) if not set",
)
parser.add_argument(
"--groups-file",
metavar="PATH",
required=True,
type=Path,
help='path to a JSON file containing the \'groups\' object from z2m 1.x configuration.yaml; e.g. {"1": {"devices": ["bulb1", "bulb2"], "friendly_name": "mygroup1"}, "2": ...}',
)
parser.add_argument(
"--ignore-server-groups",
action="store_true",
help="ignore the current group info from the server; assume unknown group membership",
)
parser.add_argument(
"--log-level",
choices=LOG_LEVELS.keys(),
default="info",
help="log level; default is 'info'",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="do everything but *writing* to the MQTT server",
)
return parser.parse_args()
def main():
args = get_args()
logging.basicConfig(
format="%(levelname)s: %(message)s", level=LOG_LEVELS[args.log_level]
)
update_groups(
args.server,
args.username,
args.password_file,
args.groups_file,
args.ignore_server_groups,
args.dry_run,
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment