First implem + tests (not finished)

This commit is contained in:
Jean-Marc Collin
2025-01-02 09:02:41 +00:00
parent e727c0628e
commit 6237273029
5 changed files with 619 additions and 2 deletions

View File

@@ -16,10 +16,10 @@ _LOGGER = logging.getLogger(__name__)
class BaseFeatureManager:
"""A base class for all feature"""
def __init__(self, vtherm: Any, hass: HomeAssistant):
def __init__(self, vtherm: Any, hass: HomeAssistant, name: str = None):
"""Init of a featureManager"""
self._vtherm = vtherm
self._name = vtherm.name
self._name = vtherm.name if vtherm else name
self._active_listener: list[CALLBACK_TYPE] = []
self._hass = hass

View File

@@ -0,0 +1,306 @@
""" Implements a central Power Feature Manager for Versatile Thermostat """
import logging
from typing import Any
from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant, Event, callback
from homeassistant.helpers.event import (
async_track_state_change_event,
EventStateChangedData,
)
from homeassistant.helpers.entity_component import EntityComponent
from homeassistant.components.climate import (
ClimateEntity,
DOMAIN as CLIMATE_DOMAIN,
)
from .const import * # pylint: disable=wildcard-import, unused-wildcard-import
from .commons import ConfigData
from .base_manager import BaseFeatureManager
# circular dependency
# from .base_thermostat import BaseThermostat
_LOGGER = logging.getLogger(__name__)
class CentralFeaturePowerManager(BaseFeatureManager):
"""A central Power feature manager"""
def __init__(self, hass: HomeAssistant, vtherm_api: Any):
"""Init of a featureManager"""
super().__init__(None, hass, "centralPowerManager")
self._hass: HomeAssistant = hass
self._vtherm_api = vtherm_api # no type due to circular reference
self._is_configured: bool = False
self._power_sensor_entity_id: str = None
self._max_power_sensor_entity_id: str = None
self._current_power: float = None
self._current_max_power: float = None
self._power_temp: float = None
def post_init(self, entry_infos: ConfigData):
"""Gets the configuration parameters"""
central_config = self._vtherm_api.find_central_configuration()
if not central_config:
_LOGGER.info(
"%s - No central configuration is found. Power management will be deactivated.",
self,
)
return
self._power_sensor_entity_id = entry_infos.get(CONF_POWER_SENSOR)
self._max_power_sensor_entity_id = entry_infos.get(CONF_MAX_POWER_SENSOR)
self._power_temp = entry_infos.get(CONF_PRESET_POWER)
self._is_configured = False
self._current_power = None
self._current_max_power = None
if (
entry_infos.get(CONF_USE_POWER_FEATURE, False)
and self._max_power_sensor_entity_id
and self._power_sensor_entity_id
and self._power_temp
):
self._is_configured = True
else:
_LOGGER.info(
"%s - Power management is not fully configured and will be deactivated",
self,
)
def start_listening(self):
"""Start listening the power sensor"""
if not self._is_configured:
return
self.stop_listening()
self.add_listener(
async_track_state_change_event(
self.hass,
[self._power_sensor_entity_id],
self._async_power_sensor_changed,
)
)
self.add_listener(
async_track_state_change_event(
self.hass,
[self._max_power_sensor_entity_id],
self._async_max_power_sensor_changed,
)
)
@callback
async def _async_power_sensor_changed(self, event: Event[EventStateChangedData]):
"""Handle power changes."""
_LOGGER.debug("Thermostat %s - Receive new Power event", self)
_LOGGER.debug(event)
new_state = event.data.get("new_state")
old_state = event.data.get("old_state")
if (
new_state is None
or new_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN)
or (old_state is not None and new_state.state == old_state.state)
):
return
try:
current_power = float(new_state.state)
if math.isnan(current_power) or math.isinf(current_power):
raise ValueError(f"Sensor has illegal state {new_state.state}")
self._current_power = current_power
if self._vtherm.preset_mode == PRESET_POWER:
await self._vtherm.async_control_heating()
except ValueError as ex:
_LOGGER.error("Unable to update current_power from sensor: %s", ex)
@callback
async def _async_max_power_sensor_changed(
self, event: Event[EventStateChangedData]
):
"""Handle power max changes."""
_LOGGER.debug("Thermostat %s - Receive new Power Max event", self.name)
_LOGGER.debug(event)
new_state = event.data.get("new_state")
old_state = event.data.get("old_state")
if (
new_state is None
or new_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN)
or (old_state is not None and new_state.state == old_state.state)
):
return
try:
current_power_max = float(new_state.state)
if math.isnan(current_power_max) or math.isinf(current_power_max):
raise ValueError(f"Sensor has illegal state {new_state.state}")
self._current_max_power = current_power_max
if self._vtherm.preset_mode == PRESET_POWER:
await self._vtherm.async_control_heating()
except ValueError as ex:
_LOGGER.error("Unable to update current_power from sensor: %s", ex)
@overrides
async def refresh_state(self) -> bool:
"""Tries to get the last state from sensor
Returns True if a change has been made"""
ret = False
if self._is_configured:
# try to acquire current power and power max
current_power_state = self.hass.states.get(self._power_sensor_entity_id)
if current_power_state and current_power_state.state not in (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
):
self._current_power = float(current_power_state.state)
_LOGGER.debug(
"%s - Current power have been retrieved: %.3f",
self,
self._current_power,
)
ret = True
# Try to acquire power max
current_power_max_state = self.hass.states.get(
self._max_power_sensor_entity_id
)
if current_power_max_state and current_power_max_state.state not in (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
):
self._current_max_power = float(current_power_max_state.state)
_LOGGER.debug(
"%s - Current power max have been retrieved: %.3f",
self,
self._current_max_power,
)
ret = True
return ret
async def calculate_shedding(self):
"""Do the shedding calculation and set/unset VTherm into overpowering state"""
if (
not self.is_configured
or not self.current_max_power
or not self.current_power
):
return
# Find all VTherms
vtherms_sorted = self.find_all_vtherm_with_power_management_sorted_by_dtemp()
available_power = self.current_max_power - self.current_power
total_affected_power = 0
for vtherm in vtherms_sorted:
device_power = vtherm.power_manager.device_power
if vtherm.is_device_active:
power_consumption_max = 0
else:
if vtherm.is_over_climate:
power_consumption_max = device_power
else:
power_consumption_max = max(
device_power / self._vtherm.nb_underlying_entities,
device_power * self._vtherm.proportional_algorithm.on_percent,
)
_LOGGER.debug(
"%s - vtherm %s power_consumption_max is %s (device_power=%s, overclimate=%s)",
self,
vtherm.name,
power_consumption_max,
vtherm.power_management.device_power,
vtherm.is_over_climate,
)
if total_affected_power + power_consumption_max >= available_power:
_LOGGER.debug(
"%s - vtherm %s should be in overpowering state", self, vtherm.name
)
await vtherm.power_manager.set_overpowering(True)
elif vtherm.power_manager.is_overpowering_detected:
total_affected_power += power_consumption_max
_LOGGER.debug(
"%s - vtherm %s should not be in overpowering state",
self,
vtherm.name,
)
await vtherm.power_manager.set_overpowering(False)
_LOGGER.debug(
"%s - after vtherm %s total_affected_power=%s, available_power=%s",
self,
vtherm.name,
total_affected_power,
available_power,
)
def find_all_vtherm_with_power_management_sorted_by_dtemp(
self,
) -> list:
"""Returns all the VTherms with power management activated"""
vtherms = []
component: EntityComponent[ClimateEntity] = self._hass.data.get(
CLIMATE_DOMAIN, None
)
if component:
for entity in component.entities:
# A little hack to test if the climate is a VTherm. Cannot use isinstance
# due to circular dependency of BaseThermostat
if (
entity.device_info
and entity.device_info.get("model", None) == DOMAIN
):
# The climate is a VTherm, we add it if it is active and power is configured
vtherm = entity
if vtherm.power_manager.is_configured and vtherm.is_on:
vtherms.append(vtherm)
# sort the result with the min temp difference first. A and B should be BaseThermostat class
def cmp_temps(a, b) -> int:
diff_a = float("inf")
diff_b = float("inf")
if a.current_temperature is not None and a.target_temperature is not None:
diff_a = a.target_temperature - a.current_temperature
if b.current_temperature is not None and b.target_temperature is not None:
diff_b = b.target_temperature - b.current_temperature
if diff_a == diff_b:
return 0
return 1 if diff_a > diff_b else -1
return vtherms.sort(key=cmp_temps)
@property
def is_configured(self) -> bool:
"""True if the FeatureManager is fully configured"""
return self._is_configured
@property
def current_power(self) -> float | None:
"""Return the current power from sensor"""
return self._current_power
@property
def current_max_power(self) -> float | None:
"""Return the current power from sensor"""
return self._current_max_power
@property
def power_temperature(self) -> float | None:
"""Return the power temperature"""
return self._power_temp
def __str__(self):
return "CentralPowerManager"

View File

@@ -28,6 +28,7 @@ from .thermostat_switch import ThermostatOverSwitch
from .thermostat_climate import ThermostatOverClimate
from .thermostat_valve import ThermostatOverValve
from .thermostat_climate_valve import ThermostatOverClimateValve
from .vtherm_api import VersatileThermostatAPI
_LOGGER = logging.getLogger(__name__)
@@ -51,6 +52,9 @@ async def async_setup_entry(
)
if vt_type == CONF_THERMOSTAT_CENTRAL_CONFIG:
# Initialize the central power manager
vtherm_api = VersatileThermostatAPI.get_vtherm_api(hass)
vtherm_api.central_power_manager.post_init(entry.data)
return
# Instantiate the right base class

View File

@@ -18,6 +18,8 @@ from .const import (
CONF_MAX_ON_PERCENT,
)
from .central_feature_power_manager import CentralFeaturePowerManager
VTHERM_API_NAME = "vtherm_api"
_LOGGER = logging.getLogger(__name__)
@@ -62,6 +64,9 @@ class VersatileThermostatAPI(dict):
# A dict that will store all Number entities which holds the temperature
self._number_temperatures = dict()
self._max_on_percent = None
self._central_power_manager = CentralFeaturePowerManager(
VersatileThermostatAPI._hass, self
)
def find_central_configuration(self):
"""Search for a central configuration"""
@@ -176,6 +181,10 @@ class VersatileThermostatAPI(dict):
if entry_id is None or entry_id == entity.unique_id:
await entity.async_startup(self.find_central_configuration())
# start listening for the central power manager if not only one vtherm reload
if not entry_id:
self.central_power_manager.start_listening()
async def init_vtherm_preset_with_central(self):
"""Init all VTherm presets when the VTherm uses central temperature"""
# Initialization of all preset for all VTherm
@@ -289,3 +298,8 @@ class VersatileThermostatAPI(dict):
def hass(self):
"""Get the HomeAssistant object"""
return VersatileThermostatAPI._hass
@property
def central_power_manager(self) -> any:
"""Returns the central power manager"""
return self._central_power_manager

View File

@@ -0,0 +1,293 @@
# pylint: disable=protected-access, unused-argument, line-too-long
""" Test the Central Power management """
from unittest.mock import patch, call, AsyncMock, MagicMock, PropertyMock
from datetime import datetime, timedelta
import logging
from custom_components.versatile_thermostat.thermostat_switch import (
ThermostatOverSwitch,
)
from custom_components.versatile_thermostat.feature_power_manager import (
FeaturePowerManager,
)
from custom_components.versatile_thermostat.central_feature_power_manager import (
CentralFeaturePowerManager,
)
from custom_components.versatile_thermostat.prop_algorithm import PropAlgorithm
from .commons import * # pylint: disable=wildcard-import, unused-wildcard-import
logging.getLogger().setLevel(logging.DEBUG)
@pytest.mark.parametrize(
"use_power_feature, power_entity_id, max_power_entity_id, power_temp, is_configured",
[
(True, "sensor.power_id", "sensor.max_power_id", 13, True),
(True, None, "sensor.max_power_id", 13, False),
(True, "sensor.power_id", None, 13, False),
(True, "sensor.power_id", "sensor.max_power_id", None, False),
(False, "sensor.power_id", "sensor.max_power_id", 13, False),
],
)
async def test_central_power_manager_init(
hass: HomeAssistant,
use_power_feature,
power_entity_id,
max_power_entity_id,
power_temp,
is_configured,
):
"""Test creation and post_init of the Central Power Manager"""
vtherm_api: VersatileThermostatAPI = MagicMock(spec=VersatileThermostatAPI)
central_power_manager = CentralFeaturePowerManager(hass, vtherm_api)
assert central_power_manager.is_configured is False
assert central_power_manager.current_max_power is None
assert central_power_manager.current_power is None
assert central_power_manager.power_temperature is None
assert central_power_manager.name == "centralPowerManager"
# 2. post_init
central_power_manager.post_init(
{
CONF_POWER_SENSOR: power_entity_id,
CONF_MAX_POWER_SENSOR: max_power_entity_id,
CONF_USE_POWER_FEATURE: use_power_feature,
CONF_PRESET_POWER: power_temp,
}
)
assert central_power_manager.is_configured == is_configured
assert central_power_manager.current_max_power is None
assert central_power_manager.current_power is None
assert central_power_manager.power_temperature == power_temp
# 3. start listening
central_power_manager.start_listening()
assert len(central_power_manager._active_listener) == (2 if is_configured else 0)
# 4. stop listening
central_power_manager.stop_listening()
assert len(central_power_manager._active_listener) == 0
@pytest.mark.parametrize(
"is_over_climate, is_device_active, power, max_power, current_overpowering_state, overpowering_state, nb_call, changed, check_overpowering_ret",
[
# don't switch to overpower (power is enough)
(False, False, 1000, 3000, STATE_OFF, STATE_OFF, 0, True, False),
# switch to overpower (power is not enough)
(False, False, 2000, 3000, STATE_OFF, STATE_ON, 1, True, True),
# don't switch to overpower (power is not enough but device is already on)
(False, True, 2000, 3000, STATE_OFF, STATE_OFF, 0, True, False),
# Same with a over_climate
# don't switch to overpower (power is enough)
(True, False, 1000, 3000, STATE_OFF, STATE_OFF, 0, True, False),
# switch to overpower (power is not enough)
(True, False, 2000, 3000, STATE_OFF, STATE_ON, 1, True, True),
# don't switch to overpower (power is not enough but device is already on)
(True, True, 2000, 3000, STATE_OFF, STATE_OFF, 0, True, False),
# Leave overpowering state
# switch to not overpower (power is enough)
(False, False, 1000, 3000, STATE_ON, STATE_OFF, 1, True, False),
# don't switch to overpower (power is still not enough)
(False, False, 2000, 3000, STATE_ON, STATE_ON, 0, True, True),
# keep overpower (power is not enough but device is already on)
(False, True, 3000, 3000, STATE_ON, STATE_ON, 0, True, True),
],
)
async def test_central_power_manager(
hass: HomeAssistant,
is_over_climate,
is_device_active,
power,
max_power,
current_overpowering_state,
overpowering_state,
nb_call,
changed,
check_overpowering_ret,
):
"""Test the FeaturePresenceManager class direclty"""
fake_vtherm = MagicMock(spec=BaseThermostat)
type(fake_vtherm).name = PropertyMock(return_value="the name")
# 1. creation
power_manager = FeaturePowerManager(fake_vtherm, hass)
assert power_manager is not None
assert power_manager.is_configured is False
assert power_manager.overpowering_state == STATE_UNAVAILABLE
assert power_manager.name == "the name"
assert len(power_manager._active_listener) == 0
custom_attributes = {}
power_manager.add_custom_attributes(custom_attributes)
assert custom_attributes["power_sensor_entity_id"] is None
assert custom_attributes["max_power_sensor_entity_id"] is None
assert custom_attributes["overpowering_state"] == STATE_UNAVAILABLE
assert custom_attributes["is_power_configured"] is False
assert custom_attributes["device_power"] is 0
assert custom_attributes["power_temp"] is None
assert custom_attributes["current_power"] is None
assert custom_attributes["current_max_power"] is None
# 2. post_init
power_manager.post_init(
{
CONF_POWER_SENSOR: "sensor.the_power_sensor",
CONF_MAX_POWER_SENSOR: "sensor.the_max_power_sensor",
CONF_USE_POWER_FEATURE: True,
CONF_PRESET_POWER: 10,
CONF_DEVICE_POWER: 1234,
}
)
assert power_manager.is_configured is True
assert power_manager.overpowering_state == STATE_UNKNOWN
custom_attributes = {}
power_manager.add_custom_attributes(custom_attributes)
assert custom_attributes["power_sensor_entity_id"] == "sensor.the_power_sensor"
assert (
custom_attributes["max_power_sensor_entity_id"] == "sensor.the_max_power_sensor"
)
assert custom_attributes["overpowering_state"] == STATE_UNKNOWN
assert custom_attributes["is_power_configured"] is True
assert custom_attributes["device_power"] == 1234
assert custom_attributes["power_temp"] == 10
assert custom_attributes["current_power"] is None
assert custom_attributes["current_max_power"] is None
# 3. start listening
power_manager.start_listening()
assert power_manager.is_configured is True
assert power_manager.overpowering_state == STATE_UNKNOWN
assert len(power_manager._active_listener) == 2
# 4. test refresh and check_overpowering with the parametrized
side_effects = SideEffects(
{
"sensor.the_power_sensor": State("sensor.the_power_sensor", power),
"sensor.the_max_power_sensor": State(
"sensor.the_max_power_sensor", max_power
),
},
State("unknown.entity_id", "unknown"),
)
# fmt:off
with patch("homeassistant.core.StateMachine.get", side_effect=side_effects.get_side_effects()) as mock_get_state:
# fmt:on
# Finish the mock configuration
tpi_algo = PropAlgorithm(PROPORTIONAL_FUNCTION_TPI, 0.6, 0.01, 5, 0, "climate.vtherm")
tpi_algo._on_percent = 1 # pylint: disable="protected-access"
type(fake_vtherm).hvac_mode = PropertyMock(return_value=HVACMode.HEAT)
type(fake_vtherm).is_device_active = PropertyMock(return_value=is_device_active)
type(fake_vtherm).is_over_climate = PropertyMock(return_value=is_over_climate)
type(fake_vtherm).proportional_algorithm = PropertyMock(return_value=tpi_algo)
type(fake_vtherm).nb_underlying_entities = PropertyMock(return_value=1)
type(fake_vtherm).preset_mode = PropertyMock(return_value=PRESET_COMFORT if current_overpowering_state == STATE_OFF else PRESET_POWER)
type(fake_vtherm)._saved_preset_mode = PropertyMock(return_value=PRESET_ECO)
fake_vtherm.save_hvac_mode = MagicMock()
fake_vtherm.restore_hvac_mode = AsyncMock()
fake_vtherm.save_preset_mode = MagicMock()
fake_vtherm.restore_preset_mode = AsyncMock()
fake_vtherm.async_underlying_entity_turn_off = AsyncMock()
fake_vtherm.async_set_preset_mode_internal = AsyncMock()
fake_vtherm.send_event = MagicMock()
fake_vtherm.update_custom_attributes = MagicMock()
ret = await power_manager.refresh_state()
assert ret == changed
assert power_manager.is_configured is True
assert power_manager.overpowering_state == STATE_UNKNOWN
assert power_manager.current_power == power
assert power_manager.current_max_power == max_power
# check overpowering
power_manager._overpowering_state = current_overpowering_state
ret2 = await power_manager.check_overpowering()
assert ret2 == check_overpowering_ret
assert power_manager.overpowering_state == overpowering_state
assert mock_get_state.call_count == 2
if power_manager.overpowering_state == STATE_OFF:
assert fake_vtherm.save_hvac_mode.call_count == 0
assert fake_vtherm.save_preset_mode.call_count == 0
assert fake_vtherm.async_underlying_entity_turn_off.call_count == 0
assert fake_vtherm.async_set_preset_mode_internal.call_count == 0
assert fake_vtherm.send_event.call_count == nb_call
if current_overpowering_state == STATE_ON:
assert fake_vtherm.update_custom_attributes.call_count == 1
assert fake_vtherm.restore_preset_mode.call_count == 1
if is_over_climate:
assert fake_vtherm.restore_hvac_mode.call_count == 1
else:
assert fake_vtherm.restore_hvac_mode.call_count == 0
else:
assert fake_vtherm.update_custom_attributes.call_count == 0
if nb_call == 1:
fake_vtherm.send_event.assert_has_calls(
[
call.fake_vtherm.send_event(
EventType.POWER_EVENT,
{'type': 'end', 'current_power': power, 'device_power': 1234, 'current_max_power': max_power}),
]
)
elif power_manager.overpowering_state == STATE_ON:
if is_over_climate:
assert fake_vtherm.save_hvac_mode.call_count == 1
else:
assert fake_vtherm.save_hvac_mode.call_count == 0
if current_overpowering_state == STATE_OFF:
assert fake_vtherm.save_preset_mode.call_count == 1
assert fake_vtherm.async_underlying_entity_turn_off.call_count == 1
assert fake_vtherm.async_set_preset_mode_internal.call_count == 1
assert fake_vtherm.send_event.call_count == 1
assert fake_vtherm.update_custom_attributes.call_count == 1
else:
assert fake_vtherm.save_preset_mode.call_count == 0
assert fake_vtherm.async_underlying_entity_turn_off.call_count == 0
assert fake_vtherm.async_set_preset_mode_internal.call_count == 0
assert fake_vtherm.send_event.call_count == 0
assert fake_vtherm.update_custom_attributes.call_count == 0
assert fake_vtherm.restore_hvac_mode.call_count == 0
assert fake_vtherm.restore_preset_mode.call_count == 0
if nb_call == 1:
fake_vtherm.send_event.assert_has_calls(
[
call.fake_vtherm.send_event(
EventType.POWER_EVENT,
{'type': 'start', 'current_power': power, 'device_power': 1234, 'current_max_power': max_power, 'current_power_consumption': 1234.0}),
]
)
fake_vtherm.reset_mock()
# 5. Check custom_attributes
custom_attributes = {}
power_manager.add_custom_attributes(custom_attributes)
assert custom_attributes["power_sensor_entity_id"] == "sensor.the_power_sensor"
assert (
custom_attributes["max_power_sensor_entity_id"] == "sensor.the_max_power_sensor"
)
assert custom_attributes["overpowering_state"] == overpowering_state
assert custom_attributes["is_power_configured"] is True
assert custom_attributes["device_power"] == 1234
assert custom_attributes["power_temp"] == 10
assert custom_attributes["current_power"] == power
assert custom_attributes["current_max_power"] == max_power
power_manager.stop_listening()
await hass.async_block_till_done()