Home Assistant Git Exporter

This commit is contained in:
root
2024-05-31 09:39:52 +02:00
parent cd6fa93633
commit d5ccfbb540
1353 changed files with 43876 additions and 0 deletions

View File

@@ -0,0 +1,290 @@
"""History access and caching. This module runs asynchronously collecting
and caching history data"""
from datetime import datetime, timedelta
from typing import Callable, OrderedDict, Any
from homeassistant.core import HomeAssistant, State, CALLBACK_TYPE
from homeassistant.util import dt
from homeassistant.components.recorder.const import DATA_INSTANCE as RECORDER_INSTANCE
from homeassistant.components.recorder import get_instance
from homeassistant.helpers.event import (
async_track_point_in_utc_time,
)
from homeassistant.components.recorder import history
from homeassistant.const import STATE_ON
from .const import (
ATTR_CURRENT_ADJUSTMENT,
ATTR_CURRENT_NAME,
CONF_ENABLED,
CONF_HISTORY,
CONF_HISTORY_REFRESH,
CONF_HISTORY_SPAN,
CONF_REFRESH_INTERVAL,
CONF_SPAN,
TIMELINE_ADJUSTMENT,
TIMELINE_SCHEDULE_NAME,
TIMELINE_START,
TIMELINE_END,
DOMAIN,
BINARY_SENSOR,
)
TIMELINE = "timeline"
TODAY_ON = "today_on"
def midnight(utc: datetime) -> datetime:
"""Accept a UTC time and return midnight for that day"""
return dt.as_utc(
dt.as_local(utc).replace(hour=0, minute=0, second=0, microsecond=0)
)
def round_seconds_dt(atime: datetime) -> datetime:
"""Round the time to the nearest second"""
return (atime + timedelta(seconds=0.5)).replace(microsecond=0)
def round_seconds_td(duration: timedelta) -> timedelta:
"""Round the timedelta to the nearest second"""
return timedelta(seconds=int(duration.total_seconds() + 0.5))
class IUHistory:
"""History access and caching"""
# pylint: disable=too-many-instance-attributes
def __init__(self, hass: HomeAssistant, callback: Callable[[set[str]], None]):
self._hass = hass
self._callback = callback
# Configuration variables
self._history_span = timedelta(days=7)
self._refresh_interval = timedelta(seconds=120)
self._enabled = True
# Private variables
self._history_last: datetime = None
self._cache: dict[str, Any] = {}
self._entity_ids: list[str] = []
self._refresh_remove: CALLBACK_TYPE = None
self._stime: datetime = None
self._initialised = False
self._fixed_clock = True
def __del__(self):
self._remove_refresh()
def _remove_refresh(self) -> None:
"""Remove the scheduled refresh"""
if self._refresh_remove is not None:
self._refresh_remove()
self._refresh_remove = None
def _get_next_refresh_event(self, utc_time: datetime, force: bool) -> datetime:
"""Calculate the next event time."""
if self._history_last is None or force or not self._fixed_clock:
return utc_time
return utc_time + self._refresh_interval
def _schedule_refresh(self, force: bool) -> None:
"""Set up a listener for the next history refresh."""
self._remove_refresh()
self._history_last = self._get_next_refresh_event(dt.utcnow(), force)
self._refresh_remove = async_track_point_in_utc_time(
self._hass,
self._async_handle_refresh_event,
self._history_last,
)
async def _async_handle_refresh_event(self, utc_time: datetime) -> None:
"""Handle history event."""
# pylint: disable=unused-argument
self._refresh_remove = None
if self._fixed_clock:
self._schedule_refresh(False)
await self._async_update_history(self._stime)
def _initialise(self) -> bool:
"""Initialise this unit"""
if self._initialised:
return False
self._remove_refresh()
self._history_last = None
self._stime = None
self._clear_cache()
self._entity_ids.clear()
for entity_id in self._hass.states.async_entity_ids():
if entity_id.startswith(f"{BINARY_SENSOR}.{DOMAIN}_"):
self._entity_ids.append(entity_id)
self._initialised = True
return True
def finalise(self):
"""Finalise this unit"""
self._remove_refresh()
def _clear_cache(self) -> None:
self._cache = {}
def _today_duration(self, stime: datetime, data: list[State]) -> timedelta:
"""Return the total on time"""
# pylint: disable=no-self-use
elapsed = timedelta(0)
front_marker: State = None
start = midnight(stime)
for item in data:
# Filter data
if item.last_changed < start:
continue
if item.last_changed > stime:
break
# Look for an on state
if front_marker is None:
if item.state == STATE_ON:
front_marker = item
continue
# Now look for an off state
if item.state != STATE_ON:
elapsed += item.last_changed - front_marker.last_changed
front_marker = None
if front_marker is not None:
elapsed += stime - front_marker.last_changed
return timedelta(seconds=round(elapsed.total_seconds()))
def _run_history(self, stime: datetime, data: list[State]) -> list:
"""Return the on/off series"""
# pylint: disable=no-self-use
def create_record(item: State, end: datetime) -> dict:
result = OrderedDict()
result[TIMELINE_START] = round_seconds_dt(item.last_changed)
result[TIMELINE_END] = round_seconds_dt(end)
result[TIMELINE_SCHEDULE_NAME] = item.attributes.get(ATTR_CURRENT_NAME)
result[TIMELINE_ADJUSTMENT] = item.attributes.get(
ATTR_CURRENT_ADJUSTMENT, ""
)
return result
run_history = []
front_marker: State = None
for item in data:
# Look for an on state
if front_marker is None:
if item.state == STATE_ON:
front_marker = item
continue
# Now look for an off state
if item.state != STATE_ON:
run_history.append(create_record(front_marker, item.last_changed))
front_marker = None
if front_marker is not None:
run_history.append(create_record(front_marker, stime))
return run_history
async def _async_update_history(self, stime: datetime) -> None:
if len(self._entity_ids) == 0:
return
start = self._stime - self._history_span
if RECORDER_INSTANCE in self._hass.data:
data = await get_instance(self._hass).async_add_executor_job(
history.get_significant_states,
self._hass,
start,
stime,
self._entity_ids,
None,
True,
False,
)
else:
data = {}
if data is None or len(data) == 0:
return
entity_ids: set[str] = set()
for entity_id in data:
new_run_history = self._run_history(stime, data[entity_id])
new_today_on = self._today_duration(stime, data[entity_id])
if entity_id not in self._cache:
self._cache[entity_id] = {}
elif (
new_today_on == self._cache[entity_id][TODAY_ON]
and new_run_history == self._cache[entity_id][TIMELINE]
):
continue
self._cache[entity_id][TIMELINE] = new_run_history
self._cache[entity_id][TODAY_ON] = new_today_on
entity_ids.add(entity_id)
if len(entity_ids) > 0:
self._callback(entity_ids)
def load(self, config: OrderedDict, fixed_clock: bool) -> "IUHistory":
"""Load config data"""
if config is None:
config = {}
self._fixed_clock = fixed_clock
span_days: int = None
refresh_seconds: int = None
# deprecated
span_days = config.get(CONF_HISTORY_SPAN)
refresh_seconds = config.get(CONF_HISTORY_REFRESH)
if CONF_HISTORY in config:
hist_conf: dict = config[CONF_HISTORY]
self._enabled = hist_conf.get(CONF_ENABLED, self._enabled)
span_days = hist_conf.get(CONF_SPAN, span_days)
refresh_seconds = hist_conf.get(CONF_REFRESH_INTERVAL, refresh_seconds)
if span_days is not None:
self._history_span = timedelta(days=span_days)
if refresh_seconds is not None:
self._refresh_interval = timedelta(seconds=refresh_seconds)
self._initialised = False
return self
def muster(self, stime: datetime, force: bool) -> None:
"""Check and update history if required"""
if force:
self._initialised = False
if not self._initialised:
self._initialise()
if self._enabled and (
force
or self._stime is None
or dt.as_local(self._stime).toordinal() != dt.as_local(stime).toordinal()
or not self._fixed_clock
):
self._schedule_refresh(True)
self._stime = stime
def today_total(self, entity_id: str) -> timedelta:
"""Return the total on time for today"""
if entity_id in self._cache:
return self._cache[entity_id][TODAY_ON]
return timedelta(0)
def timeline(self, entity_id: str) -> list[dict]:
"""Return the timeline history"""
if entity_id in self._cache:
return self._cache[entity_id][TIMELINE].copy()
return []