Compare commits

...

1 Commits

4 changed files with 154 additions and 2 deletions

View File

@@ -109,10 +109,13 @@ from .const import (
PRESET_AC_SUFFIX,
)
from .commons import get_tz
from .underlyings import UnderlyingEntity
from .prop_algorithm import PropAlgorithm
from .open_window_algorithm import WindowOpenDetectionAlgorithm
from .ema import EstimatedMobileAverage
_LOGGER = logging.getLogger(__name__)
@@ -246,6 +249,8 @@ class BaseThermostat(ClimateEntity, RestoreEntity):
self._underlyings = []
self._smooth_temp = None
self._ema_algo = None
self.post_init(entry_infos)
def post_init(self, entry_infos):
@@ -450,6 +455,13 @@ class BaseThermostat(ClimateEntity, RestoreEntity):
self._total_energy = 0
self._ema_algo = EstimatedMobileAverage(
self.name,
self._cycle_min * 60,
# Needed for time calculation
get_tz(self._hass),
)
_LOGGER.debug(
"%s - Creation of a new VersatileThermostat entity: unique_id=%s",
self,
@@ -1476,6 +1488,11 @@ class BaseThermostat(ClimateEntity, RestoreEntity):
self._last_temperature_mesure = self.get_state_date_or_now(state)
# calculate the smooth_temperature with EMA calculation
self._ema_temp = self._ema_algo.calculate_ema(
self._cur_temp, self._last_temperature_mesure
)
_LOGGER.debug(
"%s - After setting _last_temperature_mesure %s , state.last_changed.replace=%s",
self,
@@ -1679,7 +1696,7 @@ class BaseThermostat(ClimateEntity, RestoreEntity):
return
slope = self._window_auto_algo.add_temp_measurement(
temperature=self._cur_temp, datetime_measure=self._last_temperature_mesure
temperature=self._ema_temp, datetime_measure=self._last_temperature_mesure
)
_LOGGER.debug(
"%s - Window auto is on, check the alert. last slope is %.3f",
@@ -2155,6 +2172,7 @@ class BaseThermostat(ClimateEntity, RestoreEntity):
"max_power_sensor_entity_id": self._max_power_sensor_entity_id,
"temperature_unit": self.temperature_unit,
"is_device_active": self.is_device_active,
"ema_temp": self._ema_temp,
}
@callback

View File

@@ -0,0 +1,80 @@
# pylint: disable=line-too-long
"""The Estimated Mobile Average calculation used for temperature slope
and maybe some others feature"""
import logging
import math
from datetime import datetime, tzinfo
_LOGGER = logging.getLogger(__name__)
MIN_TIME_DECAY_SEC = 5
# As for the EMA calculation of irregular time series, I've seen that it might be useful to
# have an upper limit for alpha in case the last measurement was too long ago.
# For example when using a half life of 10 minutes a measurement that is 60 minutes ago
# (if there's nothing inbetween) would contribute to the smoothed value with 1,5%,
# giving the current measurement 98,5% relevance. It could be wise to limit the alpha to e.g. 4x the half life (=0.9375).
MAX_ALPHA = 0.9375
class EstimatedMobileAverage:
"""A class that will do the Estimated Mobile Average calculation"""
def __init__(self, vterm_name: str, halflife: float, timezone: tzinfo):
"""The halflife is the duration in secondes of a normal cycle"""
self._halflife: float = halflife
self._timezone = timezone
self._current_ema: float = None
self._last_timestamp: datetime = datetime.now(self._timezone)
self._name = vterm_name
def __str__(self) -> str:
return f"EMA-{self._name}"
def calculate_ema(self, measurement: float, timestamp: datetime) -> float | None:
"""Calculate the new EMA from a new measurement measured at timestamp
Return the EMA or None if all parameters are not initialized now
"""
if measurement is None or timestamp is None:
_LOGGER.warning(
"%s - Cannot calculate EMA: measurement and timestamp are mandatory. This message can be normal at startup but should not persist",
self,
)
return measurement
if self._current_ema is None:
_LOGGER.debug(
"%s - First init of the EMA",
self,
)
self._current_ema = measurement
self._last_timestamp = timestamp
return self._current_ema
time_decay = (timestamp - self._last_timestamp).total_seconds()
if time_decay < MIN_TIME_DECAY_SEC:
_LOGGER.debug(
"%s - time_decay %s is too small (< %s). Forget the measurement",
self,
time_decay,
MIN_TIME_DECAY_SEC,
)
return self._current_ema
alpha = 1 - math.exp(math.log(0.5) * time_decay / self._halflife)
# capping alpha to avoid gap if last measurement was long time ago
alpha = min(alpha, 0.9375)
new_ema = round(alpha * measurement + (1 - alpha) * self._current_ema, 1)
self._last_timestamp = timestamp
self._current_ema = new_ema
_LOGGER.debug(
"%s - alpha=%.2f new_ema=%.2f last_timestamp=%s",
self,
alpha,
self._current_ema,
self._last_timestamp,
)
return self._current_ema

View File

@@ -49,7 +49,8 @@ class PITemperatureRegulator:
self.target_temp = target_temp
# Do not reset the accumulated error
# Discussion #191. After a target change we should reset the accumulated error which is certainly wrong now.
self.accumulated_error = 0
if self.accumulated_error < 0:
self.accumulated_error = 0
def calculate_regulated_temperature(
self, internal_temp: float, external_temp: float

53
tests/test_ema.py Normal file
View File

@@ -0,0 +1,53 @@
# pylint: disable=line-too-long
""" Tests de EMA calculation"""
from datetime import datetime, timedelta
from homeassistant.core import HomeAssistant
from custom_components.versatile_thermostat.ema import EstimatedMobileAverage
from .commons import get_tz
def test_ema_basics(hass: HomeAssistant):
"""Test the EMA calculation with basic features"""
tz = get_tz(hass) # pylint: disable=invalid-name
now: datetime = datetime.now(tz=tz)
the_ema = EstimatedMobileAverage(
"test",
# 5 minutes
300,
# Needed for time calculation
get_tz(hass),
)
assert the_ema
current_timestamp = now
# First initialization
assert the_ema.calculate_ema(20, current_timestamp) == 20
current_timestamp = current_timestamp + timedelta(minutes=1)
# One minute later, same temperature. EMA temperature should not have change
assert the_ema.calculate_ema(20, current_timestamp) == 20
# Too short measurement should be ignored
assert the_ema.calculate_ema(2000, current_timestamp) == 20
current_timestamp = current_timestamp + timedelta(seconds=4)
assert the_ema.calculate_ema(20, current_timestamp) == 20
# a new normal measurement 5 minutes later
current_timestamp = current_timestamp + timedelta(minutes=5)
ema = the_ema.calculate_ema(25, current_timestamp)
assert ema > 20
assert ema == 22.5
# a big change in a short time does have a limited effect
current_timestamp = current_timestamp + timedelta(seconds=5)
ema = the_ema.calculate_ema(30, current_timestamp)
assert ema > 22.5
assert ema < 23
assert ema == 22.6