Coverage for custom_components/supernotify/snoozer.py: 84%
174 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
1import datetime as dt
2import logging
3from datetime import timedelta
4from typing import Any
6from homeassistant.core import Event
7from homeassistant.util import dt as dt_util
9from . import (
10 ATTR_ACTION,
11 ATTR_MOBILE_APP_ID,
12 ATTR_PERSON_ID,
13 ATTR_USER_ID,
14 CONF_PERSON,
15 PRIORITY_CRITICAL,
16 PRIORITY_MEDIUM,
17)
18from .delivery import Delivery
19from .model import CommandType, GlobalTargetType, QualifiedTargetType, RecipientType, Target, TargetType
20from .people import PeopleRegistry
22SNOOZE_TIME = timedelta(hours=1) # TODO: move to configuration
23_LOGGER = logging.getLogger(__name__)
26class Snooze:
27 target: str | list[str] | None
28 target_type: TargetType
29 snoozed_at: dt.datetime
30 snooze_until: dt.datetime | None = None
31 recipient_type: RecipientType
32 recipient: str | None
33 reason: str | None = None
35 def __init__(
36 self,
37 target_type: TargetType,
38 recipient_type: RecipientType,
39 target: str | list[str] | None = None,
40 recipient: str | None = None,
41 snooze_for: timedelta | None = None,
42 reason: str | None = None,
43 ) -> None:
44 self.snoozed_at = dt_util.now()
45 self.target = target
46 self.target_type = target_type
47 self.recipient_type: RecipientType = recipient_type
48 self.recipient = recipient
49 self.reason = reason
50 self.snooze_until = None
51 if snooze_for:
52 self.snooze_until = self.snoozed_at + snooze_for
54 def std_recipient(self) -> str | None:
55 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE
57 def short_key(self) -> str:
58 # only one GLOBAL can be active at a time
59 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}"
60 return f"{target}_{self.std_recipient()}"
62 def __eq__(self, other: object) -> bool:
63 """Check if two snoozes for the same thing"""
64 if not isinstance(other, Snooze):
65 return False
66 return self.short_key() == other.short_key()
68 def __repr__(self) -> str:
69 """Return a string representation of the object."""
70 return f"Snooze({self.target_type}, {self.target}, {self.std_recipient()})"
72 def active(self) -> bool:
73 return self.snooze_until is None or self.snooze_until > dt_util.now()
75 def export(self) -> dict[str, Any]:
76 return {
77 "target_type": self.target_type,
78 "target": self.target,
79 "recipient_type": self.recipient_type,
80 "recipient": self.recipient,
81 "reason": self.reason,
82 "snoozed_at": dt_util.as_local(self.snoozed_at).strftime("%H:%M:%S") if self.snoozed_at else None,
83 "snooze_until": dt_util.as_local(self.snooze_until).strftime("%H:%M:%S") if self.snooze_until else None,
84 }
87class Snoozer:
88 """Manage snoozing"""
90 def __init__(self, people_registry: PeopleRegistry | None = None) -> None:
91 self.snoozes: dict[str, Snooze] = {}
92 self.people_registry: PeopleRegistry | None = people_registry
94 def handle_command_event(self, event: Event, people: dict[str, Any] | None = None) -> None:
95 people = people or {}
96 try:
97 cmd: CommandType
98 target_type: TargetType | None = None
99 target: str | None = None
100 snooze_for: timedelta = SNOOZE_TIME
101 recipient_type: RecipientType | None = None
102 event_name = event.data.get(ATTR_ACTION)
104 if not event_name:
105 _LOGGER.warning(
106 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s",
107 event.origin,
108 event.time_fired,
109 event.data,
110 event.context,
111 )
112 return
114 _LOGGER.debug(
115 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context
116 )
117 event_parts: list[str] = event_name.split("_")
118 if len(event_parts) < 4:
119 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name)
120 return
121 cmd = CommandType[event_parts[1]]
122 recipient_type = RecipientType[event_parts[2]]
123 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4:
124 target_type = QualifiedTargetType[event_parts[3]]
125 target = event_parts[4]
126 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 6 else SNOOZE_TIME
127 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4:
128 target_type = GlobalTargetType[event_parts[3]]
129 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 5 else SNOOZE_TIME
131 if cmd is None or target_type is None or recipient_type is None:
132 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name)
133 return
135 except KeyError as ke:
136 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke)
137 return
138 except Exception as e:
139 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e)
140 return
142 try:
143 recipient: str | None = None
144 if recipient_type == RecipientType.USER:
145 target_people = [
146 p.get(CONF_PERSON)
147 for p in people.values()
148 if p.get(ATTR_USER_ID) == event.context.user_id and event.context.user_id is not None and p.get(CONF_PERSON)
149 ]
150 if target_people:
151 recipient = target_people[0]
152 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient)
153 else:
154 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id)
155 return
157 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for)
159 except Exception as e:
160 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e)
162 def register_snooze(
163 self,
164 cmd: CommandType,
165 target_type: TargetType,
166 target: str | None,
167 recipient_type: RecipientType,
168 recipient: str | None,
169 snooze_for: timedelta | None,
170 reason: str = "User command",
171 ) -> None:
172 if cmd == CommandType.SNOOZE:
173 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason)
174 self.snoozes[snooze.short_key()] = snooze
175 elif cmd == CommandType.SILENCE:
176 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason)
177 self.snoozes[snooze.short_key()] = snooze
178 elif cmd == CommandType.NORMAL:
179 anti_snooze = Snooze(target_type, recipient_type, target, recipient)
180 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()]
181 for k in to_del:
182 del self.snoozes[k]
183 else:
184 _LOGGER.warning( # type: ignore
185 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)",
186 cmd,
187 target_type,
188 target,
189 recipient_type,
190 )
192 def purge_snoozes(self) -> None:
193 to_del = [k for k, v in self.snoozes.items() if not v.active()]
194 for k in to_del:
195 del self.snoozes[k]
197 def clear(self) -> int:
198 cleared = len(self.snoozes)
199 self.snoozes.clear()
200 return cleared
202 def export(self) -> list[dict[str, Any]]:
203 return [s.export() for s in self.snoozes.values()]
205 def current_snoozes(self, priority: str, delivery: Delivery) -> list[Snooze]:
206 inscope_snoozes: list[Snooze] = []
208 for snooze in self.snoozes.values():
209 if snooze.active():
210 match snooze.target_type:
211 case GlobalTargetType.EVERYTHING:
212 inscope_snoozes.append(snooze)
213 case GlobalTargetType.NONCRITICAL:
214 if priority != PRIORITY_CRITICAL:
215 inscope_snoozes.append(snooze)
216 case QualifiedTargetType.DELIVERY:
217 if snooze.target == delivery.name:
218 inscope_snoozes.append(snooze)
219 case QualifiedTargetType.PRIORITY:
220 if snooze.target == priority:
221 inscope_snoozes.append(snooze)
222 case QualifiedTargetType.MOBILE:
223 inscope_snoozes.append(snooze)
224 case QualifiedTargetType.TRANSPORT:
225 if snooze.target == delivery.transport.name:
226 inscope_snoozes.append(snooze)
227 case QualifiedTargetType.CAMERA:
228 inscope_snoozes.append(snooze)
229 case _:
230 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type)
232 return inscope_snoozes
234 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool:
235 for snooze in self.snoozes.values():
236 if snooze.active():
237 match snooze.target_type:
238 case GlobalTargetType.EVERYTHING:
239 return True
240 case GlobalTargetType.NONCRITICAL:
241 if priority != PRIORITY_CRITICAL:
242 return True
244 return False
246 def filter_recipients(self, recipients: Target, priority: str, delivery: Delivery) -> Target:
247 inscope_snoozes = self.current_snoozes(priority, delivery)
248 for snooze in inscope_snoozes:
249 if snooze.recipient_type == RecipientType.USER:
250 # assume the everyone checks are made before notification gets this far
251 if (
252 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery.name)
253 or (snooze.target_type == QualifiedTargetType.TRANSPORT and snooze.target == delivery.transport.name)
254 or (
255 snooze.target_type == QualifiedTargetType.PRIORITY
256 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target))
257 )
258 or snooze.target_type == GlobalTargetType.EVERYTHING
259 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL)
260 ):
261 recipients_to_remove = []
262 for recipient in recipients.person_ids:
263 if recipient == snooze.recipient:
264 recipients_to_remove.append(recipient)
265 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient)
267 recipients.remove(ATTR_PERSON_ID, recipients_to_remove)
269 if snooze.target_type == QualifiedTargetType.MOBILE:
270 to_remove: list[str] = []
271 for recipient in recipients.mobile_app_ids:
272 if recipient == snooze.target:
273 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target)
274 to_remove.append(recipient)
275 if to_remove:
276 recipients.remove(ATTR_MOBILE_APP_ID, to_remove)
277 return recipients