Coverage for custom_components/supernotify/snoozer.py: 95%
168 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-04-01 15:06 +0000
1import datetime as dt
2import logging
3from datetime import timedelta
4from typing import TYPE_CHECKING, Any
6from homeassistant.util import dt as dt_util
8from .const import ATTR_ACTION, ATTR_MOBILE_APP_ID, ATTR_PERSON_ID, CONF_SNOOZE_TIME, PRIORITY_CRITICAL, PRIORITY_MEDIUM
9from .model import CommandType, GlobalTargetType, QualifiedTargetType, RecipientType, Target, TargetType
11if TYPE_CHECKING:
12 from homeassistant.core import Event
14 from .delivery import Delivery
15 from .people import PeopleRegistry, Recipient
17_LOGGER = logging.getLogger(__name__)
20class Snooze:
21 target: str | list[str] | None
22 target_type: TargetType
23 snoozed_at: dt.datetime
24 snooze_until: dt.datetime | None = None
25 recipient_type: RecipientType
26 recipient: str | None
27 reason: str | None = None
29 def __init__(
30 self,
31 target_type: TargetType,
32 recipient_type: RecipientType,
33 target: str | list[str] | None = None,
34 recipient: str | None = None,
35 snooze_for: timedelta | None = None,
36 reason: str | None = None,
37 ) -> None:
38 self.snoozed_at = dt_util.now()
39 self.target = target
40 self.target_type = target_type
41 self.recipient_type: RecipientType = recipient_type
42 self.recipient = recipient
43 self.reason = reason
44 self.snooze_until = None
45 if snooze_for:
46 self.snooze_until = self.snoozed_at + snooze_for
48 def std_recipient(self) -> str | None:
49 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE
51 def short_key(self) -> str:
52 # only one GLOBAL can be active at a time
53 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}"
54 return f"{target}_{self.std_recipient()}"
56 def __eq__(self, other: object) -> bool:
57 """Check if two snoozes for the same thing"""
58 if not isinstance(other, Snooze):
59 return False
60 return self.short_key() == other.short_key()
62 def __repr__(self) -> str:
63 """Return a string representation of the object."""
64 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}"
65 return f"Snooze({target}, {self.std_recipient()})"
67 def active(self) -> bool:
68 return self.snooze_until is None or self.snooze_until > dt_util.now()
70 def export(self) -> dict[str, Any]:
71 return {
72 "target_type": self.target_type,
73 "target": self.target,
74 "recipient_type": self.recipient_type,
75 "recipient": self.recipient,
76 "reason": self.reason,
77 "snoozed_at": dt_util.as_local(self.snoozed_at).strftime("%H:%M:%S") if self.snoozed_at else None,
78 "snooze_until": dt_util.as_local(self.snooze_until).strftime("%H:%M:%S") if self.snooze_until else None,
79 }
82class Snoozer:
83 """Manage snoozing"""
85 def __init__(self, config: dict[str, Any] | None = None, people_registry: PeopleRegistry | None = None) -> None:
86 self.snoozes: dict[str, Snooze] = {}
87 self.people_registry: PeopleRegistry | None = people_registry
88 self.config = config or {}
89 self.snooze_period = timedelta(seconds=self.config.get(CONF_SNOOZE_TIME, 60 * 60))
91 def handle_command_event(self, event: Event, people: list[Recipient] | None = None) -> None:
92 people = people or []
93 try:
94 cmd: CommandType
95 target_type: TargetType | None = None
96 target: str | None = None
97 snooze_for: timedelta = self.snooze_period
98 recipient_type: RecipientType | None = None
99 event_name = event.data.get(ATTR_ACTION)
101 if not event_name:
102 _LOGGER.warning(
103 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s",
104 event.origin,
105 event.time_fired,
106 event.data,
107 event.context,
108 )
109 return
111 _LOGGER.debug(
112 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context
113 )
114 event_parts: list[str] = event_name.split("_")
115 if len(event_parts) < 4:
116 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name)
117 return
118 cmd = CommandType[event_parts[1]]
119 recipient_type = RecipientType[event_parts[2]]
120 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4:
121 target_type = QualifiedTargetType[event_parts[3]]
122 target = event_parts[4]
123 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 6 else self.snooze_period
124 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4:
125 target_type = GlobalTargetType[event_parts[3]]
126 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 5 else self.snooze_period
128 if cmd is None or target_type is None or recipient_type is None:
129 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name)
130 return
132 except KeyError as ke:
133 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke)
134 return
135 except Exception as e:
136 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e)
137 return
139 try:
140 recipient: str | None = None
141 if recipient_type == RecipientType.USER:
142 target_people = [
143 p.entity_id
144 for p in people
145 if p.user_id == event.context.user_id and event.context.user_id is not None and p.entity_id
146 ]
147 if target_people:
148 recipient = target_people[0]
149 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient)
150 else:
151 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id)
152 return
154 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for)
156 except Exception as e:
157 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e)
159 def register_snooze(
160 self,
161 cmd: CommandType,
162 target_type: TargetType,
163 target: str | None,
164 recipient_type: RecipientType,
165 recipient: str | None,
166 snooze_for: timedelta | None,
167 reason: str = "User command",
168 ) -> None:
169 if cmd == CommandType.SNOOZE:
170 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason)
171 self.snoozes[snooze.short_key()] = snooze
172 elif cmd == CommandType.SILENCE:
173 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason)
174 self.snoozes[snooze.short_key()] = snooze
175 elif cmd == CommandType.NORMAL:
176 anti_snooze = Snooze(target_type, recipient_type, target, recipient)
177 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()]
178 for k in to_del:
179 del self.snoozes[k]
180 else:
181 _LOGGER.warning( # type: ignore
182 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)",
183 cmd,
184 target_type,
185 target,
186 recipient_type,
187 )
189 def purge_snoozes(self) -> None:
190 to_del = [k for k, v in self.snoozes.items() if not v.active()]
191 for k in to_del:
192 del self.snoozes[k]
194 def clear(self) -> int:
195 cleared = len(self.snoozes)
196 self.snoozes.clear()
197 return cleared
199 def export(self) -> list[dict[str, Any]]:
200 return [s.export() for s in self.snoozes.values()]
202 def current_snoozes(self, priority: str, delivery: Delivery) -> list[Snooze]:
203 inscope_snoozes: list[Snooze] = []
205 for snooze in self.snoozes.values():
206 if snooze.active():
207 match snooze.target_type:
208 case GlobalTargetType.EVERYTHING:
209 inscope_snoozes.append(snooze)
210 case GlobalTargetType.NONCRITICAL:
211 if priority != PRIORITY_CRITICAL:
212 inscope_snoozes.append(snooze)
213 case QualifiedTargetType.DELIVERY:
214 if snooze.target == delivery.name:
215 inscope_snoozes.append(snooze)
216 case QualifiedTargetType.PRIORITY:
217 if snooze.target == priority:
218 inscope_snoozes.append(snooze)
219 case QualifiedTargetType.MOBILE:
220 inscope_snoozes.append(snooze)
221 case QualifiedTargetType.TRANSPORT:
222 if snooze.target == delivery.transport.name:
223 inscope_snoozes.append(snooze)
224 case QualifiedTargetType.CAMERA:
225 inscope_snoozes.append(snooze)
226 case _:
227 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type)
229 return inscope_snoozes
231 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool:
232 for snooze in self.snoozes.values():
233 if snooze.active():
234 match snooze.target_type:
235 case GlobalTargetType.EVERYTHING:
236 return True
237 case GlobalTargetType.NONCRITICAL:
238 if priority != PRIORITY_CRITICAL:
239 return True
241 return False
243 def filter_recipients(self, recipients: Target, priority: str, delivery: Delivery) -> Target:
244 inscope_snoozes = self.current_snoozes(priority, delivery)
245 for snooze in inscope_snoozes:
246 if snooze.recipient_type == RecipientType.USER:
247 # assume the everyone checks are made before notification gets this far
248 if (
249 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery.name)
250 or (snooze.target_type == QualifiedTargetType.TRANSPORT and snooze.target == delivery.transport.name)
251 or (
252 snooze.target_type == QualifiedTargetType.PRIORITY
253 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target))
254 )
255 or snooze.target_type == GlobalTargetType.EVERYTHING
256 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL)
257 ):
258 recipients_to_remove = []
259 for recipient in recipients.person_ids:
260 if recipient == snooze.recipient:
261 recipients_to_remove.append(recipient)
262 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient)
264 recipients.remove(ATTR_PERSON_ID, recipients_to_remove)
266 if snooze.target_type == QualifiedTargetType.MOBILE:
267 to_remove: list[str] = []
268 for recipient in recipients.mobile_app_ids:
269 if recipient == snooze.target:
270 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target)
271 to_remove.append(recipient)
272 if to_remove:
273 recipients.remove(ATTR_MOBILE_APP_ID, to_remove)
274 return recipients