All tests ok for central_feature_power_manager

This commit is contained in:
Jean-Marc Collin
2025-01-03 10:24:09 +00:00
parent 9f3199a053
commit 03fbc5362a
5 changed files with 239 additions and 97 deletions

View File

@@ -191,7 +191,6 @@ class BaseThermostat(ClimateEntity, RestoreEntity, Generic[T]):
self._ema_temp = None
self._ema_algo = None
self._now = None
self._attr_fan_mode = None
@@ -1582,15 +1581,6 @@ class BaseThermostat(ClimateEntity, RestoreEntity, Generic[T]):
await self.async_set_hvac_mode(HVACMode.OFF)
return
def _set_now(self, now: datetime):
"""Set the now timestamp. This is only for tests purpose"""
self._now = now
@property
def now(self) -> datetime:
"""Get now. The local datetime or the overloaded _set_now date"""
return self._now if self._now is not None else NowClass.get_now(self._hass)
@property
def is_initialized(self) -> bool:
"""Check if all underlyings are initialized
@@ -1965,3 +1955,17 @@ class BaseThermostat(ClimateEntity, RestoreEntity, Generic[T]):
def is_preset_configured(self, preset) -> bool:
"""Returns True if the preset in argument is configured"""
return self._presets.get(preset, None) is not None
# For testing purpose
@DeprecationWarning
def _set_now(self, now: datetime):
"""Set the now timestamp. This is only for tests purpose
This method should be replaced by the vthermAPI equivalent"""
VersatileThermostatAPI.get_vtherm_api(self._hass).set_now(now)
@property
@DeprecationWarning
def now(self) -> datetime:
"""Get now. The local datetime or the overloaded _set_now date
This method should be replaced by the vthermAPI equivalent"""
return VersatileThermostatAPI.get_vtherm_api(self._hass).now

View File

@@ -27,6 +27,8 @@ from .base_manager import BaseFeatureManager
# circular dependency
# from .base_thermostat import BaseThermostat
MIN_DTEMP_SECS = 60
_LOGGER = logging.getLogger(__name__)
@@ -44,6 +46,7 @@ class CentralFeaturePowerManager(BaseFeatureManager):
self._current_power: float = None
self._current_max_power: float = None
self._power_temp: float = None
self._last_shedding_date = None
def post_init(self, entry_infos: ConfigData):
"""Gets the configuration parameters"""
@@ -103,65 +106,26 @@ class CentralFeaturePowerManager(BaseFeatureManager):
"""Handle power changes."""
_LOGGER.debug("Thermostat %s - Receive new Power event", self)
_LOGGER.debug(event)
new_state = event.data.get("new_state")
old_state = event.data.get("old_state")
if (
new_state is None
or new_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN)
or (old_state is not None and new_state.state == old_state.state)
):
return
try:
current_power = float(new_state.state)
if math.isnan(current_power) or math.isinf(current_power):
raise ValueError(f"Sensor has illegal state {new_state.state}")
self._current_power = current_power
if self._vtherm.preset_mode == PRESET_POWER:
await self._vtherm.async_control_heating()
except ValueError as ex:
_LOGGER.error("Unable to update current_power from sensor: %s", ex)
self.refresh_state()
@callback
async def _max_power_sensor_changed(self, event: Event[EventStateChangedData]):
"""Handle power max changes."""
_LOGGER.debug("Thermostat %s - Receive new Power Max event", self.name)
_LOGGER.debug(event)
new_state = event.data.get("new_state")
old_state = event.data.get("old_state")
if (
new_state is None
or new_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN)
or (old_state is not None and new_state.state == old_state.state)
):
return
try:
current_power_max = float(new_state.state)
if math.isnan(current_power_max) or math.isinf(current_power_max):
raise ValueError(f"Sensor has illegal state {new_state.state}")
self._current_max_power = current_power_max
if self._vtherm.preset_mode == PRESET_POWER:
await self._vtherm.async_control_heating()
except ValueError as ex:
_LOGGER.error("Unable to update current_power from sensor: %s", ex)
self.refresh_state()
@overrides
async def refresh_state(self) -> bool:
def refresh_state(self) -> bool:
"""Tries to get the last state from sensor
Returns True if a change has been made"""
ret = False
if self._is_configured:
# try to acquire current power and power max
current_power_state = self.hass.states.get(self._power_sensor_entity_id)
if current_power_state and current_power_state.state not in (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
):
self._current_power = float(current_power_state.state)
if (
new_state := get_safe_float(self._hass, self._power_sensor_entity_id)
) is not None:
self._current_power = new_state
_LOGGER.debug(
"%s - Current power have been retrieved: %.3f",
self,
@@ -170,14 +134,12 @@ class CentralFeaturePowerManager(BaseFeatureManager):
ret = True
# Try to acquire power max
current_power_max_state = self.hass.states.get(
self._max_power_sensor_entity_id
)
if current_power_max_state and current_power_max_state.state not in (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
):
self._current_max_power = float(current_power_max_state.state)
if (
new_state := get_safe_float(
self._hass, self._max_power_sensor_entity_id
)
) is not None:
self._current_max_power = new_state
_LOGGER.debug(
"%s - Current power max have been retrieved: %.3f",
self,
@@ -185,6 +147,18 @@ class CentralFeaturePowerManager(BaseFeatureManager):
)
ret = True
# check if we need to re-calculate shedding
if ret:
now = self._vtherm_api.now
dtimestamp = (
(now - self._last_shedding_date).seconds
if self._last_shedding_date
else 999
)
if dtimestamp >= MIN_DTEMP_SECS:
self.calculate_shedding()
self._last_shedding_date = now
return ret
async def calculate_shedding(self):

View File

@@ -503,6 +503,8 @@ def get_safe_float(hass, entity_id: str):
if (
entity_id is None
or not (state := hass.states.get(entity_id))
or state.state is None
or state.state == "None"
or state.state == "unknown"
or state.state == "unavailable"
):

View File

@@ -1,6 +1,7 @@
""" The API of Versatile Thermostat"""
import logging
from datetime import datetime
from homeassistant.core import HomeAssistant
from homeassistant.config_entries import ConfigEntry
@@ -16,6 +17,7 @@ from .const import (
CONF_THERMOSTAT_TYPE,
CONF_THERMOSTAT_CENTRAL_CONFIG,
CONF_MAX_ON_PERCENT,
NowClass,
)
from .central_feature_power_manager import CentralFeaturePowerManager
@@ -68,6 +70,9 @@ class VersatileThermostatAPI(dict):
VersatileThermostatAPI._hass, self
)
# the current time (for testing purpose)
self._now = None
def find_central_configuration(self):
"""Search for a central configuration"""
if not self._central_configuration:
@@ -303,3 +308,13 @@ class VersatileThermostatAPI(dict):
def central_power_manager(self) -> any:
"""Returns the central power manager"""
return self._central_power_manager
# For testing purpose
def _set_now(self, now: datetime):
"""Set the now timestamp. This is only for tests purpose"""
self._now = now
@property
def now(self) -> datetime:
"""Get now. The local datetime or the overloaded _set_now date"""
return self._now if self._now is not None else NowClass.get_now(self._hass)

View File

@@ -1,19 +1,15 @@
# pylint: disable=protected-access, unused-argument, line-too-long
""" Test the Central Power management """
from unittest.mock import patch, call, AsyncMock, MagicMock, PropertyMock
from unittest.mock import patch, AsyncMock, MagicMock, PropertyMock
from datetime import datetime, timedelta
import logging
from custom_components.versatile_thermostat.thermostat_switch import (
ThermostatOverSwitch,
)
from custom_components.versatile_thermostat.feature_power_manager import (
FeaturePowerManager,
)
from custom_components.versatile_thermostat.central_feature_power_manager import (
CentralFeaturePowerManager,
)
from custom_components.versatile_thermostat.prop_algorithm import PropAlgorithm
from .commons import * # pylint: disable=wildcard-import, unused-wildcard-import
logging.getLogger().setLevel(logging.DEBUG)
@@ -194,17 +190,11 @@ async def test_central_power_manageer_find_vtherms(
vtherms = []
for vtherm_config in vtherm_configs:
vtherm = MagicMock(spec=BaseThermostat)
type(vtherm).name = PropertyMock(return_value=vtherm_config.get("name"))
type(vtherm).is_on = PropertyMock(return_value=vtherm_config.get("is_on"))
type(vtherm).current_temperature = PropertyMock(
return_value=vtherm_config.get("current_temperature")
)
type(vtherm).target_temperature = PropertyMock(
return_value=vtherm_config.get("target_temperature")
)
type(vtherm.power_manager).is_configured = PropertyMock(
return_value=vtherm_config.get("is_configured")
)
vtherm.name = vtherm_config.get("name")
vtherm.is_on = vtherm_config.get("is_on")
vtherm.current_temperature = vtherm_config.get("current_temperature")
vtherm.target_temperature = vtherm_config.get("target_temperature")
vtherm.power_manager.is_configured = vtherm_config.get("is_configured")
vtherms.append(vtherm)
with patch(
@@ -371,6 +361,7 @@ async def test_central_power_manageer_find_vtherms(
),
],
)
# @pytest.mark.skip
async def test_central_power_manageer_calculate_shedding(
hass: HomeAssistant,
current_power,
@@ -414,16 +405,18 @@ async def test_central_power_manageer_calculate_shedding(
vtherms.append(vtherm)
type(central_power_manager).current_max_power = PropertyMock(
return_value=current_max_power
)
type(central_power_manager).current_power = PropertyMock(return_value=current_power)
type(central_power_manager).is_configured = PropertyMock(return_value=True)
# type(central_power_manager).current_max_power = PropertyMock(
# return_value=current_max_power
# )
# type(central_power_manager).current_power = PropertyMock(return_value=current_power)
# type(central_power_manager).is_configured = PropertyMock(return_value=True)
with patch(
"custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.find_all_vtherm_with_power_management_sorted_by_dtemp",
return_value=vtherms,
):
# fmt:off
with patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.find_all_vtherm_with_power_management_sorted_by_dtemp", return_value=vtherms), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.current_max_power", new_callable=PropertyMock, return_value=current_max_power), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.current_power", new_callable=PropertyMock, return_value=current_power), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.is_configured", new_callable=PropertyMock, return_value=True):
# fmt:on
await central_power_manager.calculate_shedding()
@@ -431,7 +424,20 @@ async def test_central_power_manageer_calculate_shedding(
assert registered_calls == expected_results
async def test_central_power_manager_power_event(hass: HomeAssistant):
@pytest.mark.parametrize(
"dsecs, power, nb_call",
[
(0, 1000, 1),
(61, 1000, 1),
(59, 1000, 1),
(0, None, 0),
(0, STATE_UNAVAILABLE, 0),
(0, STATE_UNKNOWN, 0),
],
)
async def test_central_power_manager_power_event(
hass: HomeAssistant, dsecs, power, nb_call
):
"""Tests the Power sensor event"""
vtherm_api: VersatileThermostatAPI = MagicMock(spec=VersatileThermostatAPI)
central_power_manager = CentralFeaturePowerManager(hass, vtherm_api)
@@ -443,21 +449,162 @@ async def test_central_power_manager_power_event(hass: HomeAssistant):
# 2. post_init
central_power_manager.post_init(
{
CONF_POWER_SENSOR: power_entity_id,
CONF_MAX_POWER_SENSOR: max_power_entity_id,
CONF_USE_POWER_FEATURE: use_power_feature,
CONF_POWER_SENSOR: "sensor.power_entity_id",
CONF_MAX_POWER_SENSOR: "sensor.max_power_entity_id",
CONF_USE_POWER_FEATURE: True,
CONF_PRESET_POWER: 13,
}
)
assert central_power_manager.is_configured == True
assert central_power_manager.is_configured is True
assert central_power_manager.current_max_power is None
assert central_power_manager.current_power is None
assert central_power_manager.power_temperature == 13
# 3. start listening
# 3. start listening (not really useful but don't eat bread)
central_power_manager.start_listening()
assert len(central_power_manager._active_listener) == 2
tz = get_tz(hass) # pylint: disable=invalid-name
now: datetime = datetime.now(tz=tz)
now: datetime = NowClass.get_now(hass)
# vtherm_api._set_now(now) vtherm_api is a MagicMock
vtherm_api.now = now
# 4. Call the _power_sensor_changed
side_effects = SideEffects(
{
"sensor.power_entity_id": State("sensor.power_entity_id", power),
"sensor.max_power_entity_id": State("sensor.max_power_entity_id", power),
},
State("unknown.entity_id", "unknown"),
)
# fmt:off
with patch("homeassistant.core.StateMachine.get", side_effect=side_effects.get_side_effects()), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.calculate_shedding", new_callable=AsyncMock) as mock_calculate_shedding:
# fmt:on
# set a default value to see if it has been replaced
central_power_manager._current_power = -999
await central_power_manager._power_sensor_changed(event=Event(
event_type=EVENT_STATE_CHANGED,
data={
"entity_id": "sensor.power_entity_id",
"new_state": State("sensor.power_entity_id", power),
"old_state": State("sensor.power_entity_id", STATE_UNAVAILABLE),
}))
expected_power = power if isinstance(power, (int, float)) else -999
assert central_power_manager.current_power == expected_power
assert mock_calculate_shedding.call_count == nb_call
# Do another call x seconds later
now = now + timedelta(seconds=dsecs)
vtherm_api.now = now
# fmt:off
with patch("homeassistant.core.StateMachine.get", side_effect=side_effects.get_side_effects()), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.calculate_shedding", new_callable=AsyncMock) as mock_calculate_shedding:
# fmt:on
central_power_manager._current_power = -999
await central_power_manager._power_sensor_changed(event=Event(
event_type=EVENT_STATE_CHANGED,
data={
"entity_id": "sensor.power_entity_id",
"new_state": State("sensor.power_entity_id", power),
"old_state": State("sensor.power_entity_id", STATE_UNAVAILABLE),
}))
assert central_power_manager.current_power == expected_power
assert mock_calculate_shedding.call_count == (nb_call if dsecs >= 60 else 0)
@pytest.mark.parametrize(
"dsecs, max_power, nb_call",
[
(0, 1000, 1),
(61, 1000, 1),
(59, 1000, 1),
(0, None, 0),
(0, STATE_UNAVAILABLE, 0),
(0, STATE_UNKNOWN, 0),
],
)
async def test_central_power_manager_max_power_event(
hass: HomeAssistant, dsecs, max_power, nb_call
):
"""Tests the Power sensor event"""
vtherm_api: VersatileThermostatAPI = MagicMock(spec=VersatileThermostatAPI)
central_power_manager = CentralFeaturePowerManager(hass, vtherm_api)
assert central_power_manager.current_power is None
assert central_power_manager.power_temperature is None
assert central_power_manager.name == "centralPowerManager"
# 2. post_init
central_power_manager.post_init(
{
CONF_POWER_SENSOR: "sensor.power_entity_id",
CONF_MAX_POWER_SENSOR: "sensor.max_power_entity_id",
CONF_USE_POWER_FEATURE: True,
CONF_PRESET_POWER: 13,
}
)
assert central_power_manager.is_configured is True
assert central_power_manager.current_max_power is None
assert central_power_manager.current_power is None
assert central_power_manager.power_temperature == 13
# 3. start listening (not really useful but don't eat bread)
central_power_manager.start_listening()
assert len(central_power_manager._active_listener) == 2
now: datetime = NowClass.get_now(hass)
# vtherm_api._set_now(now) vtherm_api is a MagicMock
vtherm_api.now = now
# 4. Call the _power_sensor_changed
side_effects = SideEffects(
{
"sensor.power_entity_id": State("sensor.power_entity_id", max_power),
"sensor.max_power_entity_id": State(
"sensor.max_power_entity_id", max_power
),
},
State("unknown.entity_id", "unknown"),
)
# fmt:off
with patch("homeassistant.core.StateMachine.get", side_effect=side_effects.get_side_effects()), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.calculate_shedding", new_callable=AsyncMock) as mock_calculate_shedding:
# fmt:on
# set a default value to see if it has been replaced
central_power_manager._current_max_power = -999
await central_power_manager._power_sensor_changed(event=Event(
event_type=EVENT_STATE_CHANGED,
data={
"entity_id": "sensor.max_power_entity_id",
"new_state": State("sensor.max_power_entity_id", max_power),
"old_state": State("sensor.max_power_entity_id", STATE_UNAVAILABLE),
}))
expected_power = max_power if isinstance(max_power, (int, float)) else -999
assert central_power_manager.current_max_power == expected_power
assert mock_calculate_shedding.call_count == nb_call
# Do another call x seconds later
now = now + timedelta(seconds=dsecs)
vtherm_api.now = now
# fmt:off
with patch("homeassistant.core.StateMachine.get", side_effect=side_effects.get_side_effects()), \
patch("custom_components.versatile_thermostat.central_feature_power_manager.CentralFeaturePowerManager.calculate_shedding", new_callable=AsyncMock) as mock_calculate_shedding:
# fmt:on
central_power_manager._current_max_power = -999
await central_power_manager._power_sensor_changed(event=Event(
event_type=EVENT_STATE_CHANGED,
data={
"entity_id": "sensor.max_power_entity_id",
"new_state": State("sensor.max_power_entity_id", max_power),
"old_state": State("sensor.max_power_entity_id", STATE_UNAVAILABLE),
}))
assert central_power_manager.current_max_power == expected_power
assert mock_calculate_shedding.call_count == (nb_call if dsecs >= 60 else 0)