Change algo

This commit is contained in:
Jean-Marc Collin
2024-10-29 21:48:47 +00:00
parent 3219fd293e
commit 7854b44a2e
5 changed files with 647 additions and 409 deletions

View File

@@ -3,6 +3,8 @@
"""
import logging
from datetime import datetime, timedelta
from typing import Literal
from homeassistant.components.climate import HVACMode
@@ -12,57 +14,73 @@ from .const import (
AUTO_START_STOP_LEVEL_MEDIUM,
AUTO_START_STOP_LEVEL_SLOW,
CONF_AUTO_START_STOP_LEVELS,
TYPE_AUTO_START_STOP_LEVELS,
)
_LOGGER = logging.getLogger(__name__)
# attribute name should be equal to AUTO_START_STOP_LEVEL_xxx constants (in const.yaml)
DTEMP = {
AUTO_START_STOP_LEVEL_NONE: 99,
AUTO_START_STOP_LEVEL_SLOW: 3,
AUTO_START_STOP_LEVEL_MEDIUM: 2,
AUTO_START_STOP_LEVEL_FAST: 1,
}
# Some constant to make algorithm depending of level
DT_MIN = {
AUTO_START_STOP_LEVEL_NONE: 99,
AUTO_START_STOP_LEVEL_NONE: 0, # Not used
AUTO_START_STOP_LEVEL_SLOW: 30,
AUTO_START_STOP_LEVEL_MEDIUM: 15,
AUTO_START_STOP_LEVEL_FAST: 7,
}
# the measurement cycle (2 min)
CYCLE_SEC = 120
ERROR_THRESHOLD = {
AUTO_START_STOP_LEVEL_NONE: 0, # Not used
AUTO_START_STOP_LEVEL_SLOW: 10, # 10 cycle above 1° or 5 cycle above 2°, ...
AUTO_START_STOP_LEVEL_MEDIUM: 5, # 5 cycle above 1° or 3 cycle above 2°, ..., 1 cycle above 5°
AUTO_START_STOP_LEVEL_FAST: 2, # 2 cycle above 1° or 1 cycle above 2°
}
AUTO_START_STOP_ACTION_OFF = "turnOff"
AUTO_START_STOP_ACTION_ON = "turnOn"
AUTO_START_STOP_ACTION_NOTHING = "nothing"
AUTO_START_STOP_ACTIONS = [
AUTO_START_STOP_ACTIONS = Literal[ # pylint: disable=invalid-name
AUTO_START_STOP_ACTION_OFF,
AUTO_START_STOP_ACTION_ON,
AUTO_START_STOP_ACTION_NOTHING,
]
class AutoStartStopDetectionAlgorithm:
"""The class that implements the algorithm listed above"""
_dt: float
_dtemp: float
_level: str
_dt: float | None = None
_level: str = AUTO_START_STOP_LEVEL_NONE
_accumulated_error: float = 0
_error_threshold: float | None = None
_last_calculation_date: datetime | None = None
def __init__(self, level: CONF_AUTO_START_STOP_LEVELS, vtherm_name) -> None:
def __init__(self, level: TYPE_AUTO_START_STOP_LEVELS, vtherm_name) -> None:
"""Initalize a new algorithm with the right constants"""
self._level = level
self._dt = DT_MIN[level]
self._dtemp = DTEMP[level]
self._vtherm_name = vtherm_name
self._init_level(level)
def _init_level(self, level: TYPE_AUTO_START_STOP_LEVELS):
"""Initialize a new level"""
if level == self._level:
return
self._level = level
if self._level != AUTO_START_STOP_LEVEL_NONE:
self._dt = DT_MIN[level]
self._error_threshold = ERROR_THRESHOLD[level]
# reset accumulated error if we change the level
self._accumulated_error = 0
def calculate_action(
self,
hvac_mode: HVACMode | None,
saved_hvac_mode: HVACMode | None,
regulated_temp: float,
target_temp: float,
current_temp: float,
slope_min: float,
now: datetime,
) -> AUTO_START_STOP_ACTIONS:
"""Calculate an eventual action to do depending of the value in parameter"""
if self._level == AUTO_START_STOP_LEVEL_NONE:
@@ -74,7 +92,6 @@ class AutoStartStopDetectionAlgorithm:
if (
hvac_mode is None
or regulated_temp is None
or target_temp is None
or current_temp is None
or slope_min is None
@@ -86,18 +103,51 @@ class AutoStartStopDetectionAlgorithm:
return AUTO_START_STOP_ACTION_NOTHING
_LOGGER.debug(
"%s - calculate_action: hvac_mode=%s, saved_hvac_mode=%s, regulated_temp=%s, target_temp=%s, current_temp=%s, slope_min=%s",
"%s - calculate_action: hvac_mode=%s, saved_hvac_mode=%s, target_temp=%s, current_temp=%s, slope_min=%s at %s",
self,
hvac_mode,
saved_hvac_mode,
regulated_temp,
target_temp,
current_temp,
slope_min,
now,
)
# Calculate the error factor (P)
error = target_temp - current_temp
# reduce the error considering the dt between the last measurement
if self._last_calculation_date is not None:
dtmin = (now - self._last_calculation_date).total_seconds() / CYCLE_SEC
# ignore two calls too near (< 2,5 min)
if dtmin <= 0.5:
_LOGGER.debug(
"%s - new calculation of auto_start_stop (%s) is too near of the last one (%s). Forget it",
self,
now,
self._last_calculation_date,
)
return AUTO_START_STOP_ACTION_NOTHING
error = error * dtmin
# If the error have change its sign, reset smoothly the accumulated error
if error * self._accumulated_error < 0:
self._accumulated_error = self._accumulated_error / 2.0
self._accumulated_error += error
# Capping of the error
self._accumulated_error = min(
self._error_threshold,
max(-self._error_threshold, self._accumulated_error),
)
self._last_calculation_date = now
# Check to turn-off
# When we hit the threshold, that mean we can turn off
if hvac_mode == HVACMode.HEAT:
if regulated_temp + self._dtemp <= target_temp and slope_min >= 0:
if self._accumulated_error <= -self._error_threshold:
_LOGGER.info(
"%s - We need to stop, there is no need for heating for a long time.",
)
@@ -109,7 +159,7 @@ class AutoStartStopDetectionAlgorithm:
return AUTO_START_STOP_ACTION_NOTHING
if hvac_mode == HVACMode.COOL:
if regulated_temp - self._dtemp >= target_temp and slope_min <= 0:
if self._accumulated_error >= self._error_threshold:
_LOGGER.info(
"%s - We need to stop, there is no need for cooling for a long time.",
)
@@ -120,6 +170,7 @@ class AutoStartStopDetectionAlgorithm:
)
return AUTO_START_STOP_ACTION_NOTHING
# check to turn on
if hvac_mode == HVACMode.OFF and saved_hvac_mode == HVACMode.HEAT:
if current_temp + slope_min * self._dt <= target_temp:
_LOGGER.info(
@@ -149,5 +200,24 @@ class AutoStartStopDetectionAlgorithm:
)
return AUTO_START_STOP_ACTION_NOTHING
def set_level(self, level: TYPE_AUTO_START_STOP_LEVELS):
"""Set a new level"""
self._init_level(level)
@property
def dt_min(self) -> float:
"""Get the dt value"""
return self._dt
@property
def accumulated_error(self) -> float:
"""Get the accumulated error value"""
return self._accumulated_error
@property
def level(self) -> TYPE_AUTO_START_STOP_LEVELS:
"""Get the level value"""
return self._level
def __str__(self) -> str:
return f"AutoStartStopDetectionAlgorithm-{self._vtherm_name}"

View File

@@ -2,6 +2,7 @@
"""Constants for the Versatile Thermostat integration."""
import logging
from typing import Literal
from enum import Enum
from homeassistant.const import CONF_NAME, Platform
@@ -158,6 +159,14 @@ CONF_AUTO_START_STOP_LEVELS = [
AUTO_START_STOP_LEVEL_FAST,
]
# For explicit typing purpose only
TYPE_AUTO_START_STOP_LEVELS = Literal[ # pylint: disable=invalid-name
AUTO_START_STOP_LEVEL_FAST,
AUTO_START_STOP_LEVEL_MEDIUM,
AUTO_START_STOP_LEVEL_SLOW,
AUTO_START_STOP_LEVEL_NONE,
]
DEFAULT_SHORT_EMA_PARAMS = {
"max_alpha": 0.5,
# In sec
@@ -458,6 +467,7 @@ class EventType(Enum):
CENTRAL_BOILER_EVENT: str = "versatile_thermostat_central_boiler_event"
PRESET_EVENT: str = "versatile_thermostat_preset_event"
WINDOW_AUTO_EVENT: str = "versatile_thermostat_window_auto_event"
AUTO_START_STOP_EVENT: str = "versatile_thermostat_auto_start_stop_event"
def send_vtherm_event(hass, event_type: EventType, entity, data: dict):

View File

@@ -49,11 +49,20 @@ from .const import (
RegulationParamStrong,
AUTO_FAN_DTEMP_THRESHOLD,
AUTO_FAN_DEACTIVATED_MODES,
CONF_AUTO_START_STOP_LEVEL,
AUTO_START_STOP_LEVEL_NONE,
TYPE_AUTO_START_STOP_LEVELS,
UnknownEntity,
EventType,
)
from .vtherm_api import VersatileThermostatAPI
from .underlyings import UnderlyingClimate
from .auto_start_stop_algorithm import (
AutoStartStopDetectionAlgorithm,
AUTO_START_STOP_ACTION_OFF,
AUTO_START_STOP_ACTION_ON,
)
_LOGGER = logging.getLogger(__name__)
@@ -64,7 +73,6 @@ HVAC_ACTION_ON = [ # pylint: disable=invalid-name
HVACAction.HEATING,
]
class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
"""Representation of a base class for a Versatile Thermostat over a climate"""
@@ -81,6 +89,8 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
# The fan_mode name depending of the current_mode
_auto_activated_fan_mode: str | None = None
_auto_deactivated_fan_mode: str | None = None
_auto_start_stop_level: TYPE_AUTO_START_STOP_LEVELS = AUTO_START_STOP_LEVEL_NONE
_auto_start_stop_algo: AutoStartStopDetectionAlgorithm | None = None
_entity_component_unrecorded_attributes = (
BaseThermostat._entity_component_unrecorded_attributes.union(
@@ -99,6 +109,9 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
"auto_activated_fan_mode",
"auto_deactivated_fan_mode",
"auto_regulation_use_device_temp",
"auto_start_stop_level",
"auto_start_stop_dtemp",
"auto_start_stop_dtmin",
}
)
)
@@ -113,6 +126,61 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
self._regulated_target_temp = self.target_temperature
self._last_regulation_change = NowClass.get_now(hass)
@overrides
def post_init(self, config_entry: ConfigData):
"""Initialize the Thermostat"""
super().post_init(config_entry)
for climate in [
CONF_CLIMATE,
CONF_CLIMATE_2,
CONF_CLIMATE_3,
CONF_CLIMATE_4,
]:
if config_entry.get(climate):
self._underlyings.append(
UnderlyingClimate(
hass=self._hass,
thermostat=self,
climate_entity_id=config_entry.get(climate),
)
)
self.choose_auto_regulation_mode(
config_entry.get(CONF_AUTO_REGULATION_MODE)
if config_entry.get(CONF_AUTO_REGULATION_MODE) is not None
else CONF_AUTO_REGULATION_NONE
)
self._auto_regulation_dtemp = (
config_entry.get(CONF_AUTO_REGULATION_DTEMP)
if config_entry.get(CONF_AUTO_REGULATION_DTEMP) is not None
else 0.5
)
self._auto_regulation_period_min = (
config_entry.get(CONF_AUTO_REGULATION_PERIOD_MIN)
if config_entry.get(CONF_AUTO_REGULATION_PERIOD_MIN) is not None
else 5
)
self._auto_fan_mode = (
config_entry.get(CONF_AUTO_FAN_MODE)
if config_entry.get(CONF_AUTO_FAN_MODE) is not None
else CONF_AUTO_FAN_NONE
)
self._auto_regulation_use_device_temp = config_entry.get(
CONF_AUTO_REGULATION_USE_DEVICE_TEMP, False
)
self._auto_start_stop_level = config_entry.get(
CONF_AUTO_START_STOP_LEVEL, AUTO_START_STOP_LEVEL_NONE
)
# Instanciate the auto start stop algo
self._auto_start_stop_algo = AutoStartStopDetectionAlgorithm(
self._auto_start_stop_level, self.name
)
@property
def is_over_climate(self) -> bool:
"""True if the Thermostat is over_climate"""
@@ -292,53 +360,6 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
)
await self.async_set_fan_mode(self._auto_deactivated_fan_mode)
@overrides
def post_init(self, config_entry: ConfigData):
"""Initialize the Thermostat"""
super().post_init(config_entry)
for climate in [
CONF_CLIMATE,
CONF_CLIMATE_2,
CONF_CLIMATE_3,
CONF_CLIMATE_4,
]:
if config_entry.get(climate):
self._underlyings.append(
UnderlyingClimate(
hass=self._hass,
thermostat=self,
climate_entity_id=config_entry.get(climate),
)
)
self.choose_auto_regulation_mode(
config_entry.get(CONF_AUTO_REGULATION_MODE)
if config_entry.get(CONF_AUTO_REGULATION_MODE) is not None
else CONF_AUTO_REGULATION_NONE
)
self._auto_regulation_dtemp = (
config_entry.get(CONF_AUTO_REGULATION_DTEMP)
if config_entry.get(CONF_AUTO_REGULATION_DTEMP) is not None
else 0.5
)
self._auto_regulation_period_min = (
config_entry.get(CONF_AUTO_REGULATION_PERIOD_MIN)
if config_entry.get(CONF_AUTO_REGULATION_PERIOD_MIN) is not None
else 5
)
self._auto_fan_mode = (
config_entry.get(CONF_AUTO_FAN_MODE)
if config_entry.get(CONF_AUTO_FAN_MODE) is not None
else CONF_AUTO_FAN_NONE
)
self._auto_regulation_use_device_temp = config_entry.get(
CONF_AUTO_REGULATION_USE_DEVICE_TEMP, False
)
def choose_auto_regulation_mode(self, auto_regulation_mode: str):
"""Choose or change the regulation mode"""
self._auto_regulation_mode = auto_regulation_mode
@@ -551,6 +572,13 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
self.auto_regulation_use_device_temp
)
self._attr_extra_state_attributes["auto_start_stop_level"] = (
self._auto_start_stop_algo.level
)
self._attr_extra_state_attributes["auto_start_stop_dtmin"] = (
self._auto_start_stop_algo.dt_min
)
self.async_write_ha_state()
_LOGGER.debug(
"%s - Calling update_custom_attributes: %s",
@@ -869,6 +897,48 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
"""The main function used to run the calculation at each cycle"""
ret = await super().async_control_heating(force, _)
# Check if we need to auto start/stop the Vtherm
action = self._auto_start_stop_algo.calculate_action(
self.hvac_mode,
self._saved_hvac_mode,
self.target_temperature,
self.current_temperature,
self._window_auto_algo.last_slope,
self.now
)
_LOGGER.debug("%s - auto_start_stop action is %s", self, action)
if action == AUTO_START_STOP_ACTION_OFF:
_LOGGER.info(
"%s - Turning OFF the Vtherm due to auto-start-stop conditions", self
)
# Send an event
self.send_event(
EventType.AUTO_START_STOP_EVENT,
{
"type": "stop",
"cause": "Auto start conditions reached",
"hvac_mode": self.hvac_mode,
"saved_hvac_mode": self._saved_hvac_mode,
"regulated_target_temp": self.regulated_target_temp,
"target_temperature": self.target_temperature,
"current_temperature": self.current_temperature,
"temperature_slope": self._window_auto_algo.last_slope,
},
)
await self.async_turn_off()
# Stop here
return ret
elif action == AUTO_START_STOP_ACTION_ON:
_LOGGER.info(
"%s - Turning ON the Vtherm due to auto-start-stop conditions", self
)
await self.async_turn_on()
# Send an event
# Continue the normal async_control_heating
# Send the regulated temperature to the underlyings
await self._send_regulated_temperature()
if self._auto_fan_mode and self._auto_fan_mode != CONF_AUTO_FAN_NONE:
@@ -1022,6 +1092,11 @@ class ThermostatOverClimate(BaseThermostat[UnderlyingClimate]):
return False
return True
@property
def auto_start_stop_level(self) -> TYPE_AUTO_START_STOP_LEVELS:
"""Return the auto start/stop level."""
return self._auto_start_stop_level
@overrides
def init_underlyings(self):
"""Init the underlyings if not already done"""