Coverage for custom_components/supernotify/model.py: 97%
326 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
1import copy
2import logging
3import re
4from collections.abc import Sequence
5from dataclasses import dataclass, field
6from enum import StrEnum, auto
7from typing import Any, ClassVar
9# This import brings in a bunch of other dependency noises, make it manual until py3.14/lazy import/HA updated
10# from homeassistant.components.mobile_app import DOMAIN as MOBILE_APP_DOMAIN
11from homeassistant.const import (
12 ATTR_AREA_ID,
13 ATTR_DEVICE_ID,
14 ATTR_ENTITY_ID,
15 ATTR_FLOOR_ID,
16 ATTR_LABEL_ID,
17 CONF_ACTION,
18 CONF_ALIAS,
19 CONF_DEBUG,
20 CONF_ENABLED,
21 CONF_OPTIONS,
22 CONF_TARGET,
23 STATE_HOME,
24 STATE_NOT_HOME,
25)
26from homeassistant.core import valid_entity_id
27from homeassistant.helpers.typing import ConfigType
29from . import (
30 ATTR_EMAIL,
31 ATTR_MOBILE_APP_ID,
32 ATTR_PERSON_ID,
33 ATTR_PHONE,
34 CONF_DATA,
35 CONF_DELIVERY_DEFAULTS,
36 CONF_DEVICE_DISCOVERY,
37 CONF_DEVICE_DOMAIN,
38 CONF_PRIORITY,
39 CONF_SELECTION,
40 CONF_SELECTION_RANK,
41 CONF_TARGET_REQUIRED,
42 CONF_TARGET_USAGE,
43 PRIORITY_MEDIUM,
44 PRIORITY_VALUES,
45 RE_DEVICE_ID,
46 SELECTION_DEFAULT,
47 TARGET_USE_ON_NO_ACTION_TARGETS,
48 SelectionRank,
49)
50from .common import ensure_list
52_LOGGER = logging.getLogger(__name__)
54# See note on import of homeassistant.components.mobile_app
55MOBILE_APP_DOMAIN = "mobile_app"
58class Target:
59 # actual targets, that can positively identified with a validator
60 DIRECT_CATEGORIES: ClassVar[list[str]] = [ATTR_ENTITY_ID, ATTR_DEVICE_ID, ATTR_EMAIL, ATTR_PHONE, ATTR_MOBILE_APP_ID]
61 # references that lead to targets, that can positively identified with a validator
62 AUTO_INDIRECT_CATEGORIES: ClassVar[list[str]] = [ATTR_PERSON_ID]
63 # references that lead to targets, that can't be positively identified with a validator
64 EXPLICIT_INDIRECT_CATEGORIES: ClassVar[list[str]] = [ATTR_AREA_ID, ATTR_FLOOR_ID, ATTR_LABEL_ID]
65 INDIRECT_CATEGORIES = EXPLICIT_INDIRECT_CATEGORIES + AUTO_INDIRECT_CATEGORIES
66 AUTO_CATEGORIES = DIRECT_CATEGORIES + AUTO_INDIRECT_CATEGORIES
68 CATEGORIES = DIRECT_CATEGORIES + INDIRECT_CATEGORIES
70 UNKNOWN_CUSTOM_CATEGORY = "_UNKNOWN_"
72 def __init__(
73 self,
74 target: str
75 | list[str]
76 | dict[str, str]
77 | dict[str, Sequence[str]]
78 | dict[str, list[str]]
79 | dict[str, str | list[str]]
80 | None = None,
81 target_data: dict[str, Any] | None = None,
82 target_specific_data: bool = False,
83 ) -> None:
84 self.target_data: dict[str, Any] | None = None
85 self.target_specific_data: dict[tuple[str, str], dict[str, Any]] | None = None
86 self.targets: dict[str, list[str]] = {}
88 matched: list[str]
90 if isinstance(target, str):
91 target = [target]
93 if target is None:
94 pass # empty constructor is valid case for target building
95 elif isinstance(target, list):
96 # simplified and legacy way of assuming list of entities that can be discriminated by validator
97 targets_left = list(target)
98 for category in self.AUTO_CATEGORIES:
99 validator = getattr(self, f"is_{category}", None)
100 if validator is not None:
101 matched = []
102 for t in targets_left:
103 if t not in matched and validator(t):
104 self.targets.setdefault(category, [])
105 self.targets[category].append(t)
106 matched.append(t)
107 targets_left = [t for t in targets_left if t not in matched]
108 else:
109 _LOGGER.debug("SUPERNOTIFY Missing validator for selective target category %s", category)
110 if not targets_left:
111 break
112 if targets_left:
113 self.targets[self.UNKNOWN_CUSTOM_CATEGORY] = targets_left
115 elif isinstance(target, dict):
116 for category in target:
117 targets = ensure_list(target[category])
118 if not targets:
119 continue
120 if category in self.AUTO_CATEGORIES:
121 validator = getattr(self, f"is_{category}", None)
122 if validator is not None:
123 for t in targets:
124 if validator(t):
125 self.targets.setdefault(category, [])
126 if t not in self.targets[category]:
127 self.targets[category].append(t)
128 else:
129 _LOGGER.warning("SUPERNOTIFY Target skipped invalid %s target: %s", category, t)
130 else:
131 _LOGGER.debug("SUPERNOTIFY Missing validator for selective target category %s", category)
133 elif category in self.CATEGORIES:
134 # categories that can't be automatically detected, like label_id
135 self.targets[category] = targets
136 else:
137 # custom categories
138 self.targets[category] = targets
139 else:
140 _LOGGER.warning("SUPERNOTIFY Target created with no valid targets: %s", target)
142 if target_data and target_specific_data:
143 self.target_specific_data = {}
144 for category, targets in self.targets.items():
145 for t in targets:
146 self.target_specific_data[category, t] = target_data
147 if target_data and not target_specific_data:
148 self.target_data = target_data
150 # Targets by category
152 @property
153 def email(self) -> list[str]:
154 return self.targets.get(ATTR_EMAIL, [])
156 @property
157 def entity_ids(self) -> list[str]:
158 return self.targets.get(ATTR_ENTITY_ID, [])
160 @property
161 def person_ids(self) -> list[str]:
162 return self.targets.get(ATTR_PERSON_ID, [])
164 @property
165 def device_ids(self) -> list[str]:
166 return self.targets.get(ATTR_DEVICE_ID, [])
168 @property
169 def phone(self) -> list[str]:
170 return self.targets.get(ATTR_PHONE, [])
172 @property
173 def mobile_app_ids(self) -> list[str]:
174 return self.targets.get(ATTR_MOBILE_APP_ID, [])
176 def custom_ids(self, category: str) -> list[str]:
177 return self.targets.get(category, []) if category not in self.CATEGORIES else []
179 @property
180 def area_ids(self) -> list[str]:
181 return self.targets.get(ATTR_AREA_ID, [])
183 @property
184 def floor_ids(self) -> list[str]:
185 return self.targets.get(ATTR_FLOOR_ID, [])
187 @property
188 def label_ids(self) -> list[str]:
189 return self.targets.get(ATTR_LABEL_ID, [])
191 # Selectors / validators
193 @classmethod
194 def is_device_id(cls, target: str) -> bool:
195 return re.match(RE_DEVICE_ID, target) is not None
197 @classmethod
198 def is_entity_id(cls, target: str) -> bool:
199 return valid_entity_id(target) and not target.startswith("person.")
201 @classmethod
202 def is_person_id(cls, target: str) -> bool:
203 return target.startswith("person.") and valid_entity_id(target)
205 @classmethod
206 def is_phone(cls, target: str) -> bool:
207 return re.fullmatch(r"^(\+\d{1,3})?\s?\(?\d{1,4}\)?[\s.-]?\d{3}[\s.-]?\d{4}$", target) is not None
209 @classmethod
210 def is_mobile_app_id(cls, target: str) -> bool:
211 return not valid_entity_id(target) and target.startswith(f"{MOBILE_APP_DOMAIN}_")
213 @classmethod
214 def is_email(cls, target: str) -> bool:
215 return (
216 re.fullmatch(
217 r"^[a-zA-Z0-9.+/=?^_-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$",
218 target,
219 )
220 is not None
221 )
223 def has_targets(self) -> bool:
224 return any(targets for category, targets in self.targets.items())
226 def has_resolved_target(self) -> bool:
227 return any(targets for category, targets in self.targets.items() if category not in self.INDIRECT_CATEGORIES)
229 def for_category(self, category: str) -> list[str]:
230 return self.targets.get(category, [])
232 @property
233 def direct_categories(self) -> list[str]:
234 return self.DIRECT_CATEGORIES + [cat for cat in self.targets if cat not in self.CATEGORIES]
236 def direct(self) -> "Target":
237 t = Target(
238 {cat: targets for cat, targets in self.targets.items() if cat in self.direct_categories},
239 target_data=self.target_data,
240 )
241 if self.target_specific_data:
242 t.target_specific_data = {k: v for k, v in self.target_specific_data.items() if k[0] in self.direct_categories}
243 return t
245 def extend(self, category: str, targets: list[str] | str) -> None:
246 targets = ensure_list(targets)
247 self.targets.setdefault(category, [])
248 self.targets[category].extend(t for t in targets if t not in self.targets[category])
250 def remove(self, category: str, targets: list[str] | str) -> None:
251 targets = ensure_list(targets)
252 if category in self.targets:
253 self.targets[category] = [t for t in self.targets[category] if t not in targets]
255 def safe_copy(self) -> "Target":
256 t = Target(dict(self.targets), target_data=dict(self.target_data) if self.target_data else None)
257 t.target_specific_data = dict(self.target_specific_data) if self.target_specific_data else None
258 return t
260 def split_by_target_data(self) -> "list[Target]":
261 if not self.target_specific_data:
262 result = self.safe_copy()
263 result.target_specific_data = None
264 return [result]
265 results: list[Target] = []
266 default: Target = self.safe_copy()
267 default.target_specific_data = None
268 last_found: dict[str, Any] | None = None
269 collected: dict[str, list[str]] = {}
270 for (category, target), data in self.target_specific_data.items():
271 if last_found is None:
272 last_found = data
273 collected = {category: [target]}
274 elif data != last_found and last_found is not None:
275 new_target: Target = Target(collected, target_data=last_found)
276 results.append(new_target)
277 default -= new_target
278 last_found = data
279 collected = {category: [target]}
280 else:
281 collected.setdefault(category, [])
282 collected[category].append(target)
283 new_target = Target(collected, target_data=last_found)
284 results.append(new_target)
285 default -= new_target
286 if default.has_targets():
287 results.append(default)
288 return results
290 def __len__(self) -> int:
291 """How many targets, whether direct or indirect"""
292 return sum(len(targets) for targets in self.targets.values())
294 def __add__(self, other: "Target") -> "Target":
295 """Create a new target by adding another to this one"""
296 new = Target()
297 categories = set(list(self.targets.keys()) + list(other.targets.keys()))
298 for category in categories:
299 new.targets[category] = list(self.targets.get(category, []))
300 new.targets[category].extend(t for t in other.targets.get(category, []) if t not in new.targets[category])
302 new.target_data = dict(self.target_data) if self.target_data else None
303 if other.target_data:
304 if new.target_data is None:
305 new.target_data = dict(other.target_data)
306 else:
307 new.target_data.update(other.target_data)
308 new.target_specific_data = dict(self.target_specific_data) if self.target_specific_data else None
309 if other.target_specific_data:
310 if new.target_specific_data is None:
311 new.target_specific_data = dict(other.target_specific_data)
312 else:
313 new.target_specific_data.update(other.target_specific_data)
314 return new
316 def __sub__(self, other: "Target") -> "Target":
317 """Create a new target by removing another from this one, ignoring target_data"""
318 new = Target()
319 new.target_data = self.target_data
320 if self.target_specific_data:
321 new.target_specific_data = {
322 k: v for k, v in self.target_specific_data.items() if k[1] not in other.targets.get(k[0], ())
323 }
324 categories = set(list(self.targets.keys()) + list(other.targets.keys()))
325 for category in categories:
326 new.targets[category] = []
327 new.targets[category].extend(t for t in self.targets.get(category, []) if t not in other.targets.get(category, []))
329 return new
331 def __eq__(self, other: object) -> bool:
332 """Compare two targets"""
333 if other is self:
334 return True
335 if other is None:
336 return False
337 if not isinstance(other, Target):
338 return NotImplemented
339 if self.target_data != other.target_data:
340 return False
341 if self.target_specific_data != other.target_specific_data:
342 return False
343 return all(self.targets.get(category, []) == other.targets.get(category, []) for category in self.CATEGORIES)
345 def as_dict(self) -> dict[str, list[str]]:
346 return copy.deepcopy(self.targets)
349class TransportConfig:
350 def __init__(self, conf: ConfigType | None = None, class_config: "TransportConfig|None" = None) -> None:
351 conf = conf or {}
352 if class_config is not None:
353 self.device_domain: list[str] = conf.get(CONF_DEVICE_DOMAIN, class_config.device_domain)
354 self.device_discovery: bool = conf.get(CONF_DEVICE_DISCOVERY, class_config.device_discovery)
355 self.enabled: bool = conf.get(CONF_ENABLED, class_config.enabled)
356 self.alias = conf.get(CONF_ALIAS)
357 self.delivery_defaults: DeliveryConfig = DeliveryConfig(
358 conf.get(CONF_DELIVERY_DEFAULTS, {}), class_config.delivery_defaults or None
359 )
360 else:
361 self.device_domain = conf.get(CONF_DEVICE_DOMAIN, [])
362 self.device_discovery = conf.get(CONF_DEVICE_DISCOVERY, False)
363 self.enabled = conf.get(CONF_ENABLED, True)
364 self.alias = conf.get(CONF_ALIAS)
365 self.delivery_defaults = DeliveryConfig(conf.get(CONF_DELIVERY_DEFAULTS) or {})
368class DeliveryConfig:
369 """Shared config for transport defaults and Delivery definitions"""
371 def __init__(self, conf: ConfigType, delivery_defaults: "DeliveryConfig|None" = None) -> None:
372 if delivery_defaults is not None:
373 # use transport defaults where no delivery level override
374 self.target: Target | None = Target(conf.get(CONF_TARGET)) if CONF_TARGET in conf else delivery_defaults.target
375 self.target_required: TargetRequired = conf.get(CONF_TARGET_REQUIRED, delivery_defaults.target_required)
376 self.target_usage: str = conf.get(CONF_TARGET_USAGE) or delivery_defaults.target_usage
377 self.action: str | None = conf.get(CONF_ACTION) or delivery_defaults.action
378 self.debug: bool = conf.get(CONF_DEBUG, delivery_defaults.debug)
380 self.data: ConfigType = dict(delivery_defaults.data) or {}
381 self.data.update(conf.get(CONF_DATA, {}))
382 self.selection: list[str] = conf.get(CONF_SELECTION, delivery_defaults.selection)
383 self.priority: list[str] = conf.get(CONF_PRIORITY, delivery_defaults.priority)
384 self.selection_rank: SelectionRank = conf.get(CONF_SELECTION_RANK, delivery_defaults.selection_rank)
385 self.options: ConfigType = conf.get(CONF_OPTIONS, {})
386 # only override options not set in config
387 for opt in delivery_defaults.options:
388 self.options.setdefault(opt, delivery_defaults.options[opt])
390 else:
391 # construct the transport defaults
392 self.target = Target(conf.get(CONF_TARGET)) if conf.get(CONF_TARGET) else None
393 self.target_required = conf.get(CONF_TARGET_REQUIRED, TargetRequired.ALWAYS)
394 self.target_usage = conf.get(CONF_TARGET_USAGE, TARGET_USE_ON_NO_ACTION_TARGETS)
395 self.action = conf.get(CONF_ACTION)
396 self.debug = conf.get(CONF_DEBUG, False)
397 self.options = conf.get(CONF_OPTIONS, {})
398 self.data = conf.get(CONF_DATA, {})
399 self.selection = conf.get(CONF_SELECTION, [SELECTION_DEFAULT])
400 self.priority = conf.get(CONF_PRIORITY, PRIORITY_VALUES)
401 self.selection_rank = conf.get(CONF_SELECTION_RANK, SelectionRank.ANY)
403 def as_dict(self) -> dict[str, Any]:
404 return {
405 CONF_TARGET: self.target.as_dict() if self.target else None,
406 CONF_ACTION: self.action,
407 CONF_OPTIONS: self.options,
408 CONF_DATA: self.data,
409 CONF_SELECTION: self.selection,
410 CONF_PRIORITY: self.priority,
411 CONF_SELECTION_RANK: self.selection_rank,
412 CONF_TARGET_REQUIRED: self.target_required,
413 CONF_TARGET_USAGE: self.target_usage,
414 }
416 def __repr__(self) -> str:
417 """Log friendly representation"""
418 return str(self.as_dict())
421@dataclass
422class ConditionVariables:
423 """Variables presented to all condition evaluations
425 Attributes
426 ----------
427 applied_scenarios (list[str]): Scenarios that have been applied
428 required_scenarios (list[str]): Scenarios that must be applied
429 constrain_scenarios (list[str]): Only scenarios in this list, or in explicit apply_scenarios, can be applied
430 notification_priority (str): Priority of the notification
431 notification_message (str): Message of the notification
432 notification_title (str): Title of the notification
433 occupancy (list[str]): List of occupancy scenarios
435 """
437 applied_scenarios: list[str] = field(default_factory=list)
438 required_scenarios: list[str] = field(default_factory=list)
439 constrain_scenarios: list[str] = field(default_factory=list)
440 notification_priority: str = PRIORITY_MEDIUM
441 notification_message: str = ""
442 notification_title: str = ""
443 occupancy: list[str] = field(default_factory=list)
445 def __init__(
446 self,
447 applied_scenarios: list[str] | None = None,
448 required_scenarios: list[str] | None = None,
449 constrain_scenarios: list[str] | None = None,
450 delivery_priority: str | None = PRIORITY_MEDIUM,
451 occupiers: dict[str, list[dict[str, Any]]] | None = None,
452 message: str | None = None,
453 title: str | None = None,
454 ) -> None:
455 occupiers = occupiers or {}
456 self.occupancy = []
457 if not occupiers.get(STATE_NOT_HOME) and occupiers.get(STATE_HOME):
458 self.occupancy.append("ALL_HOME")
459 elif occupiers.get(STATE_NOT_HOME) and not occupiers.get(STATE_HOME):
460 self.occupancy.append("ALL_AWAY")
461 if len(occupiers.get(STATE_HOME, [])) == 1:
462 self.occupancy.extend(["LONE_HOME", "SOME_HOME"])
463 elif len(occupiers.get(STATE_HOME, [])) > 1 and occupiers.get(STATE_NOT_HOME):
464 self.occupancy.extend(["MULTI_HOME", "SOME_HOME"])
465 self.applied_scenarios = applied_scenarios or []
466 self.required_scenarios = required_scenarios or []
467 self.constrain_scenarios = constrain_scenarios or []
468 self.notification_priority = delivery_priority or PRIORITY_MEDIUM
469 self.notification_message = message or ""
470 self.notification_title = title or ""
472 def as_dict(self) -> ConfigType:
473 return {
474 "applied_scenarios": self.applied_scenarios,
475 "required_scenarios": self.required_scenarios,
476 "constrain_scenarios": self.constrain_scenarios,
477 "notification_message": self.notification_message,
478 "notification_title": self.notification_title,
479 "occupancy": self.occupancy,
480 }
483class SuppressionReason(StrEnum):
484 SNOOZED = "SNOOZED"
485 DUPE = "DUPE"
486 NO_SCENARIO = "NO_SCENARIO"
489class TargetRequired(StrEnum):
490 ALWAYS = auto()
491 NEVER = auto()
492 OPTIONAL = auto()
494 @classmethod
495 def _missing_(cls, value: Any) -> "TargetRequired|None":
496 """Backward compatibility for binary values"""
497 if value is True or (isinstance(value, str) and value.lower() in ("true", "on")):
498 return cls.ALWAYS
499 if value is False or (isinstance(value, str) and value.lower() in ("false", "off")):
500 return cls.OPTIONAL
501 return None
504class TargetType(StrEnum):
505 pass
508class GlobalTargetType(TargetType):
509 NONCRITICAL = "NONCRITICAL"
510 EVERYTHING = "EVERYTHING"
513class RecipientType(StrEnum):
514 USER = "USER"
515 EVERYONE = "EVERYONE"
518class QualifiedTargetType(TargetType):
519 TRANSPORT = "TRANSPORT"
520 DELIVERY = "DELIVERY"
521 CAMERA = "CAMERA"
522 PRIORITY = "PRIORITY"
523 MOBILE = "MOBILE"
526class CommandType(StrEnum):
527 SNOOZE = "SNOOZE"
528 SILENCE = "SILENCE"
529 NORMAL = "NORMAL"
532class MessageOnlyPolicy(StrEnum):
533 STANDARD = "STANDARD" # independent title and message
534 USE_TITLE = "USE_TITLE" # use title in place of message, no title
535 # use combined title and message as message, no title
536 COMBINE_TITLE = "COMBINE_TITLE"