Skip to content

Instantly share code, notes, and snippets.

@OttoWinter
Last active February 7, 2018 17:26
Show Gist options
  • Save OttoWinter/fd52d1d1ca30cdcb5a46fbdb5c0443f0 to your computer and use it in GitHub Desktop.
Save OttoWinter/fd52d1d1ca30cdcb5a46fbdb5c0443f0 to your computer and use it in GitHub Desktop.
Grouped Lights

Grouped Lights

Based off of the excellent work over at https://gist.github.com/jjensn/83dd60d96330bbf66e58dfae336187bf

This component allows lights to be grouped together and supports all of Home Assistant's features such as effects and color temperatures. The "child" lights can still be used as normal though as this component just forwards all calls to all of the children and uses the first active child light for the group state.

In order to use this, first copy the attached code to a file custom_components/light/grouped_light.py in your configuration folder. Then, specify all of your grouped lights like this:

light:
  - platform: grouped_light
    name: Cool Light Group
    entities:
      - light.amazing_light
      - light.foobar
      - light.sun
"""
This component allows several lights to be grouped into one light.
"""
import asyncio
import logging
import itertools
from typing import List, Tuple, Optional, TypeVar, Iterator
import voluptuous as vol
from homeassistant.core import State
from homeassistant.components import light
from homeassistant.const import (STATE_OFF, STATE_ON, SERVICE_TURN_ON,
SERVICE_TURN_OFF, ATTR_ENTITY_ID, CONF_NAME,
CONF_ENTITIES, STATE_UNAVAILABLE,
STATE_UNKNOWN)
from homeassistant.helpers.event import async_track_state_change
from homeassistant.helpers.typing import HomeAssistantType, ConfigType
from homeassistant.components.light import (SUPPORT_BRIGHTNESS,
SUPPORT_RGB_COLOR,
SUPPORT_COLOR_TEMP,
SUPPORT_TRANSITION, SUPPORT_EFFECT,
SUPPORT_FLASH, SUPPORT_XY_COLOR,
SUPPORT_WHITE_VALUE,
PLATFORM_SCHEMA)
import homeassistant.helpers.config_validation as cv
_LOGGER = logging.getLogger(__name__)
DOMAIN = 'grouped_light'
DEFAULT_NAME = 'Grouped Light'
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_ENTITIES): cv.entity_ids,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
})
SUPPORT_GROUP_LIGHT = (SUPPORT_BRIGHTNESS | SUPPORT_COLOR_TEMP |
SUPPORT_EFFECT | SUPPORT_FLASH | SUPPORT_RGB_COLOR |
SUPPORT_TRANSITION | SUPPORT_XY_COLOR |
SUPPORT_WHITE_VALUE)
@asyncio.coroutine
def async_setup_platform(hass: HomeAssistantType, config: ConfigType,
async_add_devices, discovery_info=None) -> None:
"""Initialize grouped_light platform."""
async_add_devices([GroupedLight(
hass,
config.get(CONF_NAME),
config.get(CONF_ENTITIES)
)])
T = TypeVar('T')
class GroupedLight(light.Light):
"""Representation of a Grouped Light."""
def __init__(self, hass: HomeAssistantType, name: str,
entity_ids: List[str]) -> None:
"""Initialize a Grouped Light."""
self.hass = hass # type: HomeAssistantType
self._name = name # type: str
self._entity_ids = entity_ids # type: List[str]
self._state = STATE_OFF # type: str
self._brightness = None # type: Optional[int]
self._xy_color = None # type: Optional[Tuple[float, float]]
self._rgb_color = None # type: Optional[Tuple[int, int, int]]
self._color_temp = None # type: Optional[int]
self._min_mireds = 154 # type: Optional[int]
self._max_mireds = 500 # type: Optional[int]
self._white_value = None # type: Optional[int]
self._effect_list = None # type: Optional[List[str]]
self._effect = None # type: Optional[str]
self._supported_features = 0 # type: int
@asyncio.coroutine
def async_added_to_hass(self) -> None:
"""Subscribe to light events."""
async_track_state_change(self.hass, self._entity_ids,
self._async_state_changed_listener)
@property
def name(self) -> str:
"""Return the name of the entity."""
return self._name
@property
def state(self) -> str:
"""Return the state."""
return self._state
@property
def is_on(self) -> bool:
"""Return True if entity is on."""
return self.state == STATE_ON
@property
def brightness(self) -> Optional[int]:
"""Return the brightness of this light between 0..255."""
return self._brightness
@property
def xy_color(self) -> Optional[Tuple[float, float]]:
"""Return the XY color value [float, float]."""
return self._xy_color
@property
def rgb_color(self) -> Optional[Tuple[int, int, int]]:
"""Return the RGB color value [int, int, int]."""
return self._rgb_color
@property
def color_temp(self) -> Optional[int]:
"""Return the CT color value in mireds."""
return self._color_temp
@property
def min_mireds(self) -> Optional[int]:
"""Return the coldest color_temp that this light supports."""
return self._min_mireds
@property
def max_mireds(self) -> Optional[int]:
"""Return the warmest color_temp that this light supports."""
return self._max_mireds
@property
def white_value(self) -> Optional[int]:
"""Return the white value of this light between 0..255."""
return self._white_value
@property
def effect_list(self) -> Optional[List[str]]:
"""Return the list of supported effects."""
return self._effect_list
@property
def effect(self) -> Optional[str]:
"""Return the current effect."""
return self._effect
@property
def supported_features(self) -> int:
"""Flag supported features."""
return self._supported_features
@property
def should_poll(self) -> bool:
"""No polling needed for a Grouped Light."""
return False
@asyncio.coroutine
def async_turn_on(self, **kwargs):
"""Forward the turn_on command to all lights in the group."""
for entity_id in self._entity_ids:
kwargs[ATTR_ENTITY_ID] = entity_id
yield from self.hass.services.async_call('light', SERVICE_TURN_ON,
kwargs, blocking=True)
@asyncio.coroutine
def async_turn_off(self, **kwargs):
"""Forward the turn_off command to all lights in the group."""
for entity_id in self._entity_ids:
kwargs[ATTR_ENTITY_ID] = entity_id
yield from self.hass.services.async_call('light', SERVICE_TURN_OFF,
kwargs, blocking=True)
@asyncio.coroutine
def async_update(self):
"""Query all members and determine the group state."""
states = self._child_states()
self._state = _determine_on_off_state(states)
self._brightness = _reduce_attribute(states, 'brightness')
self._xy_color = _reduce_attribute(states, 'xy_color')
self._rgb_color = _reduce_attribute(states, 'rgb_color')
self._color_temp = _reduce_attribute(states, 'color_temp')
self._min_mireds = _reduce_attribute(states, 'min_mireds', default=154,
force_on=False)
self._max_mireds = _reduce_attribute(states, 'max_mireds', default=500,
force_on=False)
self._white_value = _reduce_attribute(states, 'white_value')
all_effect_lists = list(_find_state_attributes(states, 'effect_list',
force_on=False))
self._effect_list = None
if all_effect_lists:
# Merge all effects from all effect_lists with a union merge.
self._effect_list = list(set().union(*all_effect_lists))
all_effects = list(_find_state_attributes(states, 'effect'))
self._effect = None
if all_effects:
flat_effects = list(itertools.chain(*all_effect_lists))
# Report the most common effect.
self._effect = max(set(flat_effects), key=flat_effects.count)
self._supported_features = 0
for support in _find_state_attributes(states, 'supported_features',
force_on=False):
# Merge supported features by emulating support for every feature
# we find.
self._supported_features |= support
# Bitwise-and the supported features with the GroupedLight's features
# so that we don't break in the future when a new feature is added.
self._supported_features &= SUPPORT_GROUP_LIGHT
@asyncio.coroutine
def _async_state_changed_listener(self, entity_id: str, old_state: State,
new_state: State):
"""Respond to a member state changing."""
yield from self._update_hass()
def _child_states(self) -> List[State]:
"""The states that the group is tracking."""
states = [self.hass.states.get(x) for x in self._entity_ids]
return list(filter(None, states))
@asyncio.coroutine
def _update_hass(self):
"""Request new status and push it to hass."""
yield from self.async_update()
yield from self.async_update_ha_state()
def _find_state_attributes(states: List[State], key: str,
force_on: bool = True) -> Iterator[T]:
"""Find attributes with matching key from states.
Only returns attributes of enabled lights when force_on is True.
"""
for state in states:
assume_on = (not force_on) or state.state == STATE_ON
if assume_on and key in state.attributes:
yield state.attributes.get(key)
def _reduce_attribute(states: List[State], key: str,
default: Optional[T] = None, force_on: bool = True) -> T:
"""Find the first attribute matching key from states.
If none are found, returns default.
"""
return next(_find_state_attributes(states, key, force_on), default)
def _determine_on_off_state(states: List[State]) -> str:
"""Helper method to determine the ON/OFF/... state of a light."""
s_states = [state.state for state in states]
if not s_states:
return STATE_UNAVAILABLE
elif any(state == STATE_ON for state in s_states):
return STATE_ON
elif all(state == STATE_UNAVAILABLE for state in s_states):
return STATE_UNAVAILABLE
elif all(state == STATE_UNKNOWN for state in s_states):
return STATE_UNKNOWN
return STATE_OFF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment