Coverage for custom_components/supernotify/snoozer.py: 21%
174 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
1import datetime as dt
2import logging
3from datetime import timedelta
4from typing import Any
6from homeassistant.core import Event
7from homeassistant.util import dt as dt_util
9from . import (
10 ATTR_ACTION,
11 ATTR_MOBILE_APP_ID,
12 ATTR_PERSON_ID,
13 PRIORITY_CRITICAL,
14 PRIORITY_MEDIUM,
15)
16from .delivery import Delivery
17from .model import CommandType, GlobalTargetType, QualifiedTargetType, RecipientType, Target, TargetType
18from .people import PeopleRegistry, Recipient
20SNOOZE_TIME = timedelta(hours=1) # TODO: move to configuration
21_LOGGER = logging.getLogger(__name__)
24class Snooze:
25 target: str | list[str] | None
26 target_type: TargetType
27 snoozed_at: dt.datetime
28 snooze_until: dt.datetime | None = None
29 recipient_type: RecipientType
30 recipient: str | None
31 reason: str | None = None
33 def __init__(
34 self,
35 target_type: TargetType,
36 recipient_type: RecipientType,
37 target: str | list[str] | None = None,
38 recipient: str | None = None,
39 snooze_for: timedelta | None = None,
40 reason: str | None = None,
41 ) -> None:
42 self.snoozed_at = dt_util.now()
43 self.target = target
44 self.target_type = target_type
45 self.recipient_type: RecipientType = recipient_type
46 self.recipient = recipient
47 self.reason = reason
48 self.snooze_until = None
49 if snooze_for:
50 self.snooze_until = self.snoozed_at + snooze_for
52 def std_recipient(self) -> str | None:
53 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE
55 def short_key(self) -> str:
56 # only one GLOBAL can be active at a time
57 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}"
58 return f"{target}_{self.std_recipient()}"
60 def __eq__(self, other: object) -> bool:
61 """Check if two snoozes for the same thing"""
62 if not isinstance(other, Snooze):
63 return False
64 return self.short_key() == other.short_key()
66 def __repr__(self) -> str:
67 """Return a string representation of the object."""
68 return f"Snooze({self.target_type}, {self.target}, {self.std_recipient()})"
70 def active(self) -> bool:
71 return self.snooze_until is None or self.snooze_until > dt_util.now()
73 def export(self) -> dict[str, Any]:
74 return {
75 "target_type": self.target_type,
76 "target": self.target,
77 "recipient_type": self.recipient_type,
78 "recipient": self.recipient,
79 "reason": self.reason,
80 "snoozed_at": dt_util.as_local(self.snoozed_at).strftime("%H:%M:%S") if self.snoozed_at else None,
81 "snooze_until": dt_util.as_local(self.snooze_until).strftime("%H:%M:%S") if self.snooze_until else None,
82 }
85class Snoozer:
86 """Manage snoozing"""
88 def __init__(self, people_registry: PeopleRegistry | None = None) -> None:
89 self.snoozes: dict[str, Snooze] = {}
90 self.people_registry: PeopleRegistry | None = people_registry
92 def handle_command_event(self, event: Event, people: list[Recipient] | None = None) -> None:
93 people = people or []
94 try:
95 cmd: CommandType
96 target_type: TargetType | None = None
97 target: str | None = None
98 snooze_for: timedelta = SNOOZE_TIME
99 recipient_type: RecipientType | None = None
100 event_name = event.data.get(ATTR_ACTION)
102 if not event_name:
103 _LOGGER.warning(
104 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s",
105 event.origin,
106 event.time_fired,
107 event.data,
108 event.context,
109 )
110 return
112 _LOGGER.debug(
113 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context
114 )
115 event_parts: list[str] = event_name.split("_")
116 if len(event_parts) < 4:
117 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name)
118 return
119 cmd = CommandType[event_parts[1]]
120 recipient_type = RecipientType[event_parts[2]]
121 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4:
122 target_type = QualifiedTargetType[event_parts[3]]
123 target = event_parts[4]
124 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 6 else SNOOZE_TIME
125 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4:
126 target_type = GlobalTargetType[event_parts[3]]
127 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 5 else SNOOZE_TIME
129 if cmd is None or target_type is None or recipient_type is None:
130 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name)
131 return
133 except KeyError as ke:
134 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke)
135 return
136 except Exception as e:
137 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e)
138 return
140 try:
141 recipient: str | None = None
142 if recipient_type == RecipientType.USER:
143 target_people = [
144 p.entity_id
145 for p in people
146 if p.user_id == event.context.user_id and event.context.user_id is not None and p.entity_id
147 ]
148 if target_people:
149 recipient = target_people[0]
150 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient)
151 else:
152 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id)
153 return
155 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for)
157 except Exception as e:
158 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e)
160 def register_snooze(
161 self,
162 cmd: CommandType,
163 target_type: TargetType,
164 target: str | None,
165 recipient_type: RecipientType,
166 recipient: str | None,
167 snooze_for: timedelta | None,
168 reason: str = "User command",
169 ) -> None:
170 if cmd == CommandType.SNOOZE:
171 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason)
172 self.snoozes[snooze.short_key()] = snooze
173 elif cmd == CommandType.SILENCE:
174 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason)
175 self.snoozes[snooze.short_key()] = snooze
176 elif cmd == CommandType.NORMAL:
177 anti_snooze = Snooze(target_type, recipient_type, target, recipient)
178 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()]
179 for k in to_del:
180 del self.snoozes[k]
181 else:
182 _LOGGER.warning( # type: ignore
183 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)",
184 cmd,
185 target_type,
186 target,
187 recipient_type,
188 )
190 def purge_snoozes(self) -> None:
191 to_del = [k for k, v in self.snoozes.items() if not v.active()]
192 for k in to_del:
193 del self.snoozes[k]
195 def clear(self) -> int:
196 cleared = len(self.snoozes)
197 self.snoozes.clear()
198 return cleared
200 def export(self) -> list[dict[str, Any]]:
201 return [s.export() for s in self.snoozes.values()]
203 def current_snoozes(self, priority: str, delivery: Delivery) -> list[Snooze]:
204 inscope_snoozes: list[Snooze] = []
206 for snooze in self.snoozes.values():
207 if snooze.active():
208 match snooze.target_type:
209 case GlobalTargetType.EVERYTHING:
210 inscope_snoozes.append(snooze)
211 case GlobalTargetType.NONCRITICAL:
212 if priority != PRIORITY_CRITICAL:
213 inscope_snoozes.append(snooze)
214 case QualifiedTargetType.DELIVERY:
215 if snooze.target == delivery.name:
216 inscope_snoozes.append(snooze)
217 case QualifiedTargetType.PRIORITY:
218 if snooze.target == priority:
219 inscope_snoozes.append(snooze)
220 case QualifiedTargetType.MOBILE:
221 inscope_snoozes.append(snooze)
222 case QualifiedTargetType.TRANSPORT:
223 if snooze.target == delivery.transport.name:
224 inscope_snoozes.append(snooze)
225 case QualifiedTargetType.CAMERA:
226 inscope_snoozes.append(snooze)
227 case _:
228 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type)
230 return inscope_snoozes
232 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool:
233 for snooze in self.snoozes.values():
234 if snooze.active():
235 match snooze.target_type:
236 case GlobalTargetType.EVERYTHING:
237 return True
238 case GlobalTargetType.NONCRITICAL:
239 if priority != PRIORITY_CRITICAL:
240 return True
242 return False
244 def filter_recipients(self, recipients: Target, priority: str, delivery: Delivery) -> Target:
245 inscope_snoozes = self.current_snoozes(priority, delivery)
246 for snooze in inscope_snoozes:
247 if snooze.recipient_type == RecipientType.USER:
248 # assume the everyone checks are made before notification gets this far
249 if (
250 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery.name)
251 or (snooze.target_type == QualifiedTargetType.TRANSPORT and snooze.target == delivery.transport.name)
252 or (
253 snooze.target_type == QualifiedTargetType.PRIORITY
254 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target))
255 )
256 or snooze.target_type == GlobalTargetType.EVERYTHING
257 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL)
258 ):
259 recipients_to_remove = []
260 for recipient in recipients.person_ids:
261 if recipient == snooze.recipient:
262 recipients_to_remove.append(recipient)
263 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient)
265 recipients.remove(ATTR_PERSON_ID, recipients_to_remove)
267 if snooze.target_type == QualifiedTargetType.MOBILE:
268 to_remove: list[str] = []
269 for recipient in recipients.mobile_app_ids:
270 if recipient == snooze.target:
271 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target)
272 to_remove.append(recipient)
273 if to_remove:
274 recipients.remove(ATTR_MOBILE_APP_ID, to_remove)
275 return recipients