feat: merge coordinators to have less duplicate code

This commit is contained in:
Xavier Morel
2025-03-24 21:42:28 +01:00
parent 987d3ded47
commit 27c59cf736
4 changed files with 69 additions and 74 deletions

View File

@@ -18,8 +18,9 @@ from .const import (
ECOCITO_REFRESH_MIN_KEY,
)
from .coordinator import (
GarbageCollectionsDataUpdateCoordinator,
RecyclingCollectionsDataUpdateCoordinator,
CollectionDataUpdateCoordinator,
# GarbageCollectionsDataUpdateCoordinator,
# RecyclingCollectionsDataUpdateCoordinator,
WasteDepotVisitsDataUpdateCoordinator,
)
@@ -30,10 +31,12 @@ PLATFORMS: list[Platform] = [Platform.SENSOR]
class EcocitoData:
"""Ecocito data type."""
garbage_collections: GarbageCollectionsDataUpdateCoordinator
garbage_collections_previous: GarbageCollectionsDataUpdateCoordinator
recycling_collections: RecyclingCollectionsDataUpdateCoordinator
recycling_collections_previous: RecyclingCollectionsDataUpdateCoordinator
# TODO: Possibly at some point we can build dynamic sensors depending on user needs
garbage_collections: CollectionDataUpdateCoordinator
garbage_collections_previous: CollectionDataUpdateCoordinator
recycling_collections: CollectionDataUpdateCoordinator
recycling_collections_previous: CollectionDataUpdateCoordinator
waste_depot_visits: WasteDepotVisitsDataUpdateCoordinator
@@ -53,17 +56,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: EcocitoConfigEntry) -> b
recycle_id = entry.data.get(ECOCITO_RECYCLE_TYPE, ECOCITO_RECYCLING_COLLECTION_TYPE)
refresh_time = entry.data.get(ECOCITO_REFRESH_MIN_KEY, ECOCITO_DEFAULT_REFRESH_MIN)
data = EcocitoData(
garbage_collections=GarbageCollectionsDataUpdateCoordinator(hass, client, 0, garbage_id, refresh_time),
garbage_collections_previous=GarbageCollectionsDataUpdateCoordinator(
garbage_collections=CollectionDataUpdateCoordinator(
hass, client, 0, garbage_id, refresh_time
),
garbage_collections_previous=CollectionDataUpdateCoordinator(
hass, client, -1, garbage_id, refresh_time
),
recycling_collections=RecyclingCollectionsDataUpdateCoordinator(
recycling_collections=CollectionDataUpdateCoordinator(
hass, client, 0, recycle_id, refresh_time
),
recycling_collections_previous=RecyclingCollectionsDataUpdateCoordinator(
recycling_collections_previous=CollectionDataUpdateCoordinator(
hass, client, -1, recycle_id, refresh_time
),
waste_depot_visits=WasteDepotVisitsDataUpdateCoordinator(hass, client, 0, refresh_time),
waste_depot_visits=WasteDepotVisitsDataUpdateCoordinator(
hass, client, 0, refresh_time
),
)
for field in fields(data):
coordinator = getattr(data, field.name)

View File

@@ -145,14 +145,6 @@ class EcocitoClient:
ECOCITO_ERROR_FETCHING.format(exc=e, type="collection events")
) from e
async def get_garbage_collections(self, year: int, type_id: int) -> list[CollectionEvent]:
"""Return the list of the garbage collections for a year."""
return await self.get_collection_events(str(type_id), year)
async def get_recycling_collections(self, year: int, type_id: int) -> list[CollectionEvent]:
"""Return the list of the recycling collections for a year."""
return await self.get_collection_events(str(type_id), year)
async def get_waste_depot_visits(self, year: int) -> list[WasteDepotVisit]:
"""Return the list of the waste depot visits for a year."""
async with aiohttp.ClientSession(cookie_jar=self._cookies) as session:

View File

@@ -54,46 +54,30 @@ class EcocitoDataUpdateCoordinator(DataUpdateCoordinator[T], Generic[T], ABC):
"""Fetch the actual data."""
raise NotImplementedError
# TODO Fuse both coordinators? Since there's no hardcoded ID anymore, the duplicate may
# not really be needed anymore.
# Also later we could build any number of sensors, not just 2 (possibly only 1 also).
class GarbageCollectionsDataUpdateCoordinator(
class CollectionDataUpdateCoordinator(
EcocitoDataUpdateCoordinator[list[CollectionEvent]]
):
"""Garbage collections list update from Ecocito."""
"""Collections list update from Ecocito."""
def __init__(
self, hass: HomeAssistant, client: EcocitoClient, year_offset: int, garbage_id: int, refresh_time: int
self,
hass: HomeAssistant,
client: EcocitoClient,
year_offset: int,
coll_type_id: int,
refresh_time: int
) -> None:
"""Initialize the coordinator."""
super().__init__(hass, client, refresh_time)
self._year_offset = year_offset
self._garbage_id = garbage_id
self._coll_type_id = coll_type_id
async def _fetch_data(self) -> list[CollectionEvent]:
"""Fetch the data."""
return await self.client.get_garbage_collections(
datetime.now(tz=self._time_zone).year + self._year_offset, self._garbage_id
)
class RecyclingCollectionsDataUpdateCoordinator(
EcocitoDataUpdateCoordinator[list[CollectionEvent]]
):
"""Recycling collections list update from Ecocito."""
def __init__(
self, hass: HomeAssistant, client: EcocitoClient, year_offset: int, recycle_id: int, refresh_time: int
) -> None:
"""Initialize the coordinator."""
super().__init__(hass, client, refresh_time)
self._year_offset = year_offset
self._recycle_id = recycle_id
async def _fetch_data(self) -> list[CollectionEvent]:
"""Fetch the data."""
return await self.client.get_recycling_collections(
datetime.now(tz=self._time_zone).year + self._year_offset, self._recycle_id
return await self.client.get_collection_events(
str(self._coll_type_id),
datetime.now(tz=self._time_zone).year + self._year_offset,
)
@@ -103,7 +87,11 @@ class WasteDepotVisitsDataUpdateCoordinator(
"""Waste depot visits list update from Ecocito."""
def __init__(
self, hass: HomeAssistant, client: EcocitoClient, year_offset: int, refresh_time: int
self,
hass: HomeAssistant,
client: EcocitoClient,
year_offset: int,
refresh_time: int
) -> None:
"""Initialize the coordinator."""
super().__init__(hass, client, refresh_time)

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant import ConfigFlowResult, config_entries
from homeassistant.const import CONF_DOMAIN, CONF_PASSWORD, CONF_USERNAME
from homeassistant.helpers import selector
@@ -25,21 +25,21 @@ def build_schema(type_mapping: dict[int, str], current: dict[str, Any]) -> vol.S
]
return vol.Schema(
{
vol.Optional(ECOCITO_GARBAGE_TYPE, default=current[ECOCITO_GARBAGE_TYPE]): selector.SelectSelector(
selector.SelectSelectorConfig(
options=types_options,
mode=selector.SelectSelectorMode.DROPDOWN,
multiple=False
)
),
vol.Optional(ECOCITO_RECYCLE_TYPE, default=current[ECOCITO_RECYCLE_TYPE]): selector.SelectSelector(
selector.SelectSelectorConfig(
options=types_options,
mode=selector.SelectSelectorMode.DROPDOWN,
multiple=False
)
),
vol.Required(ECOCITO_REFRESH_MIN_KEY, default=current[ECOCITO_REFRESH_MIN_KEY]): int
vol.Optional(ECOCITO_GARBAGE_TYPE, default=current[ECOCITO_GARBAGE_TYPE]):
selector.SelectSelector(selector.SelectSelectorConfig(
options=types_options,
mode=selector.SelectSelectorMode.DROPDOWN,
multiple=False
)),
vol.Optional(ECOCITO_RECYCLE_TYPE, default=current[ECOCITO_RECYCLE_TYPE]):
selector.SelectSelector(selector.SelectSelectorConfig(
options=types_options,
mode=selector.SelectSelectorMode.DROPDOWN,
multiple=False
)),
vol.Required(
ECOCITO_REFRESH_MIN_KEY, default=current[ECOCITO_REFRESH_MIN_KEY]
): int
}
)
@@ -63,20 +63,26 @@ class EcocitoOptionsFlowHandler(config_entries.OptionsFlow):
async def update_config(self, user_input: dict[str, Any]) -> None:
"""Update configuration with new user input."""
# TODO Sanitize user input
new_data = dict(self._entry.data)
if user_input[ECOCITO_GARBAGE_TYPE] != new_data.get(ECOCITO_GARBAGE_TYPE):
new_data[ECOCITO_GARBAGE_TYPE] = int(user_input[ECOCITO_GARBAGE_TYPE])
if user_input[ECOCITO_RECYCLE_TYPE] != new_data.get(ECOCITO_RECYCLE_TYPE):
new_data[ECOCITO_RECYCLE_TYPE] = int(user_input[ECOCITO_RECYCLE_TYPE])
if user_input[ECOCITO_REFRESH_MIN_KEY] != new_data.get(ECOCITO_REFRESH_MIN_KEY):
new_data[ECOCITO_REFRESH_MIN_KEY] = int(user_input[ECOCITO_REFRESH_MIN_KEY])
for key in [
ECOCITO_GARBAGE_TYPE,
ECOCITO_RECYCLE_TYPE,
ECOCITO_REFRESH_MIN_KEY,
]:
if key not in user_input:
continue
int_val = int(user_input[key])
if (key not in new_data or int_val != new_data.get(key)) and int_val > 0:
new_data[key] = int_val
self.hass.config_entries.async_update_entry(
self._entry, data=new_data
)
await self.hass.config_entries.async_reload(self._entry.entry_id)
async def async_step_init(self, user_input: dict[str, Any] | None = None):
async def async_step_init(
self,
user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Display configuration menu."""
errors: dict[str, str] = {}
if user_input is not None:
@@ -85,7 +91,9 @@ class EcocitoOptionsFlowHandler(config_entries.OptionsFlow):
placeholders = {
ECOCITO_GARBAGE_TYPE: str(self._entry.data.get(ECOCITO_GARBAGE_TYPE, 15)),
ECOCITO_RECYCLE_TYPE: str(self._entry.data.get(ECOCITO_RECYCLE_TYPE, 16)),
ECOCITO_REFRESH_MIN_KEY: int(self._entry.data.get(ECOCITO_REFRESH_MIN_KEY, 60)),
ECOCITO_REFRESH_MIN_KEY: int(
self._entry.data.get(ECOCITO_REFRESH_MIN_KEY, 60)
),
}
schema = build_schema(await self.get_type_mapping(), placeholders)
return self.async_show_form(