Coverage for custom_components/supernotify/model.py: 97%

326 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-21 23:31 +0000

1import copy 

2import logging 

3import re 

4from collections.abc import Sequence 

5from dataclasses import dataclass, field 

6from enum import StrEnum, auto 

7from typing import Any, ClassVar 

8 

9# This import brings in a bunch of other dependency noises, make it manual until py3.14/lazy import/HA updated 

10# from homeassistant.components.mobile_app import DOMAIN as MOBILE_APP_DOMAIN 

11from homeassistant.const import ( 

12 ATTR_AREA_ID, 

13 ATTR_DEVICE_ID, 

14 ATTR_ENTITY_ID, 

15 ATTR_FLOOR_ID, 

16 ATTR_LABEL_ID, 

17 CONF_ACTION, 

18 CONF_ALIAS, 

19 CONF_DEBUG, 

20 CONF_ENABLED, 

21 CONF_OPTIONS, 

22 CONF_TARGET, 

23 STATE_HOME, 

24 STATE_NOT_HOME, 

25) 

26from homeassistant.core import valid_entity_id 

27from homeassistant.helpers.typing import ConfigType 

28 

29from . import ( 

30 ATTR_EMAIL, 

31 ATTR_MOBILE_APP_ID, 

32 ATTR_PERSON_ID, 

33 ATTR_PHONE, 

34 CONF_DATA, 

35 CONF_DELIVERY_DEFAULTS, 

36 CONF_DEVICE_DISCOVERY, 

37 CONF_DEVICE_DOMAIN, 

38 CONF_PRIORITY, 

39 CONF_SELECTION, 

40 CONF_SELECTION_RANK, 

41 CONF_TARGET_REQUIRED, 

42 CONF_TARGET_USAGE, 

43 PRIORITY_MEDIUM, 

44 PRIORITY_VALUES, 

45 RE_DEVICE_ID, 

46 SELECTION_DEFAULT, 

47 TARGET_USE_ON_NO_ACTION_TARGETS, 

48 SelectionRank, 

49) 

50from .common import ensure_list 

51 

52_LOGGER = logging.getLogger(__name__) 

53 

54# See note on import of homeassistant.components.mobile_app 

55MOBILE_APP_DOMAIN = "mobile_app" 

56 

57 

58class Target: 

59 # actual targets, that can positively identified with a validator 

60 DIRECT_CATEGORIES: ClassVar[list[str]] = [ATTR_ENTITY_ID, ATTR_DEVICE_ID, ATTR_EMAIL, ATTR_PHONE, ATTR_MOBILE_APP_ID] 

61 # references that lead to targets, that can positively identified with a validator 

62 AUTO_INDIRECT_CATEGORIES: ClassVar[list[str]] = [ATTR_PERSON_ID] 

63 # references that lead to targets, that can't be positively identified with a validator 

64 EXPLICIT_INDIRECT_CATEGORIES: ClassVar[list[str]] = [ATTR_AREA_ID, ATTR_FLOOR_ID, ATTR_LABEL_ID] 

65 INDIRECT_CATEGORIES = EXPLICIT_INDIRECT_CATEGORIES + AUTO_INDIRECT_CATEGORIES 

66 AUTO_CATEGORIES = DIRECT_CATEGORIES + AUTO_INDIRECT_CATEGORIES 

67 

68 CATEGORIES = DIRECT_CATEGORIES + INDIRECT_CATEGORIES 

69 

70 UNKNOWN_CUSTOM_CATEGORY = "_UNKNOWN_" 

71 

72 def __init__( 

73 self, 

74 target: str 

75 | list[str] 

76 | dict[str, str] 

77 | dict[str, Sequence[str]] 

78 | dict[str, list[str]] 

79 | dict[str, str | list[str]] 

80 | None = None, 

81 target_data: dict[str, Any] | None = None, 

82 target_specific_data: bool = False, 

83 ) -> None: 

84 self.target_data: dict[str, Any] | None = None 

85 self.target_specific_data: dict[tuple[str, str], dict[str, Any]] | None = None 

86 self.targets: dict[str, list[str]] = {} 

87 

88 matched: list[str] 

89 

90 if isinstance(target, str): 

91 target = [target] 

92 

93 if target is None: 

94 pass # empty constructor is valid case for target building 

95 elif isinstance(target, list): 

96 # simplified and legacy way of assuming list of entities that can be discriminated by validator 

97 targets_left = list(target) 

98 for category in self.AUTO_CATEGORIES: 

99 validator = getattr(self, f"is_{category}", None) 

100 if validator is not None: 

101 matched = [] 

102 for t in targets_left: 

103 if t not in matched and validator(t): 

104 self.targets.setdefault(category, []) 

105 self.targets[category].append(t) 

106 matched.append(t) 

107 targets_left = [t for t in targets_left if t not in matched] 

108 else: 

109 _LOGGER.debug("SUPERNOTIFY Missing validator for selective target category %s", category) 

110 if not targets_left: 

111 break 

112 if targets_left: 

113 self.targets[self.UNKNOWN_CUSTOM_CATEGORY] = targets_left 

114 

115 elif isinstance(target, dict): 

116 for category in target: 

117 targets = ensure_list(target[category]) 

118 if not targets: 

119 continue 

120 if category in self.AUTO_CATEGORIES: 

121 validator = getattr(self, f"is_{category}", None) 

122 if validator is not None: 

123 for t in targets: 

124 if validator(t): 

125 self.targets.setdefault(category, []) 

126 if t not in self.targets[category]: 

127 self.targets[category].append(t) 

128 else: 

129 _LOGGER.warning("SUPERNOTIFY Target skipped invalid %s target: %s", category, t) 

130 else: 

131 _LOGGER.debug("SUPERNOTIFY Missing validator for selective target category %s", category) 

132 

133 elif category in self.CATEGORIES: 

134 # categories that can't be automatically detected, like label_id 

135 self.targets[category] = targets 

136 else: 

137 # custom categories 

138 self.targets[category] = targets 

139 else: 

140 _LOGGER.warning("SUPERNOTIFY Target created with no valid targets: %s", target) 

141 

142 if target_data and target_specific_data: 

143 self.target_specific_data = {} 

144 for category, targets in self.targets.items(): 

145 for t in targets: 

146 self.target_specific_data[category, t] = target_data 

147 if target_data and not target_specific_data: 

148 self.target_data = target_data 

149 

150 # Targets by category 

151 

152 @property 

153 def email(self) -> list[str]: 

154 return self.targets.get(ATTR_EMAIL, []) 

155 

156 @property 

157 def entity_ids(self) -> list[str]: 

158 return self.targets.get(ATTR_ENTITY_ID, []) 

159 

160 @property 

161 def person_ids(self) -> list[str]: 

162 return self.targets.get(ATTR_PERSON_ID, []) 

163 

164 @property 

165 def device_ids(self) -> list[str]: 

166 return self.targets.get(ATTR_DEVICE_ID, []) 

167 

168 @property 

169 def phone(self) -> list[str]: 

170 return self.targets.get(ATTR_PHONE, []) 

171 

172 @property 

173 def mobile_app_ids(self) -> list[str]: 

174 return self.targets.get(ATTR_MOBILE_APP_ID, []) 

175 

176 def custom_ids(self, category: str) -> list[str]: 

177 return self.targets.get(category, []) if category not in self.CATEGORIES else [] 

178 

179 @property 

180 def area_ids(self) -> list[str]: 

181 return self.targets.get(ATTR_AREA_ID, []) 

182 

183 @property 

184 def floor_ids(self) -> list[str]: 

185 return self.targets.get(ATTR_FLOOR_ID, []) 

186 

187 @property 

188 def label_ids(self) -> list[str]: 

189 return self.targets.get(ATTR_LABEL_ID, []) 

190 

191 # Selectors / validators 

192 

193 @classmethod 

194 def is_device_id(cls, target: str) -> bool: 

195 return re.match(RE_DEVICE_ID, target) is not None 

196 

197 @classmethod 

198 def is_entity_id(cls, target: str) -> bool: 

199 return valid_entity_id(target) and not target.startswith("person.") 

200 

201 @classmethod 

202 def is_person_id(cls, target: str) -> bool: 

203 return target.startswith("person.") and valid_entity_id(target) 

204 

205 @classmethod 

206 def is_phone(cls, target: str) -> bool: 

207 return re.fullmatch(r"^(\+\d{1,3})?\s?\(?\d{1,4}\)?[\s.-]?\d{3}[\s.-]?\d{4}$", target) is not None 

208 

209 @classmethod 

210 def is_mobile_app_id(cls, target: str) -> bool: 

211 return not valid_entity_id(target) and target.startswith(f"{MOBILE_APP_DOMAIN}_") 

212 

213 @classmethod 

214 def is_email(cls, target: str) -> bool: 

215 return ( 

216 re.fullmatch( 

217 r"^[a-zA-Z0-9.+/=?^_-]+@[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+$", 

218 target, 

219 ) 

220 is not None 

221 ) 

222 

223 def has_targets(self) -> bool: 

224 return any(targets for category, targets in self.targets.items()) 

225 

226 def has_resolved_target(self) -> bool: 

227 return any(targets for category, targets in self.targets.items() if category not in self.INDIRECT_CATEGORIES) 

228 

229 def for_category(self, category: str) -> list[str]: 

230 return self.targets.get(category, []) 

231 

232 @property 

233 def direct_categories(self) -> list[str]: 

234 return self.DIRECT_CATEGORIES + [cat for cat in self.targets if cat not in self.CATEGORIES] 

235 

236 def direct(self) -> "Target": 

237 t = Target( 

238 {cat: targets for cat, targets in self.targets.items() if cat in self.direct_categories}, 

239 target_data=self.target_data, 

240 ) 

241 if self.target_specific_data: 

242 t.target_specific_data = {k: v for k, v in self.target_specific_data.items() if k[0] in self.direct_categories} 

243 return t 

244 

245 def extend(self, category: str, targets: list[str] | str) -> None: 

246 targets = ensure_list(targets) 

247 self.targets.setdefault(category, []) 

248 self.targets[category].extend(t for t in targets if t not in self.targets[category]) 

249 

250 def remove(self, category: str, targets: list[str] | str) -> None: 

251 targets = ensure_list(targets) 

252 if category in self.targets: 

253 self.targets[category] = [t for t in self.targets[category] if t not in targets] 

254 

255 def safe_copy(self) -> "Target": 

256 t = Target(dict(self.targets), target_data=dict(self.target_data) if self.target_data else None) 

257 t.target_specific_data = dict(self.target_specific_data) if self.target_specific_data else None 

258 return t 

259 

260 def split_by_target_data(self) -> "list[Target]": 

261 if not self.target_specific_data: 

262 result = self.safe_copy() 

263 result.target_specific_data = None 

264 return [result] 

265 results: list[Target] = [] 

266 default: Target = self.safe_copy() 

267 default.target_specific_data = None 

268 last_found: dict[str, Any] | None = None 

269 collected: dict[str, list[str]] = {} 

270 for (category, target), data in self.target_specific_data.items(): 

271 if last_found is None: 

272 last_found = data 

273 collected = {category: [target]} 

274 elif data != last_found and last_found is not None: 

275 new_target: Target = Target(collected, target_data=last_found) 

276 results.append(new_target) 

277 default -= new_target 

278 last_found = data 

279 collected = {category: [target]} 

280 else: 

281 collected.setdefault(category, []) 

282 collected[category].append(target) 

283 new_target = Target(collected, target_data=last_found) 

284 results.append(new_target) 

285 default -= new_target 

286 if default.has_targets(): 

287 results.append(default) 

288 return results 

289 

290 def __len__(self) -> int: 

291 """How many targets, whether direct or indirect""" 

292 return sum(len(targets) for targets in self.targets.values()) 

293 

294 def __add__(self, other: "Target") -> "Target": 

295 """Create a new target by adding another to this one""" 

296 new = Target() 

297 categories = set(list(self.targets.keys()) + list(other.targets.keys())) 

298 for category in categories: 

299 new.targets[category] = list(self.targets.get(category, [])) 

300 new.targets[category].extend(t for t in other.targets.get(category, []) if t not in new.targets[category]) 

301 

302 new.target_data = dict(self.target_data) if self.target_data else None 

303 if other.target_data: 

304 if new.target_data is None: 

305 new.target_data = dict(other.target_data) 

306 else: 

307 new.target_data.update(other.target_data) 

308 new.target_specific_data = dict(self.target_specific_data) if self.target_specific_data else None 

309 if other.target_specific_data: 

310 if new.target_specific_data is None: 

311 new.target_specific_data = dict(other.target_specific_data) 

312 else: 

313 new.target_specific_data.update(other.target_specific_data) 

314 return new 

315 

316 def __sub__(self, other: "Target") -> "Target": 

317 """Create a new target by removing another from this one, ignoring target_data""" 

318 new = Target() 

319 new.target_data = self.target_data 

320 if self.target_specific_data: 

321 new.target_specific_data = { 

322 k: v for k, v in self.target_specific_data.items() if k[1] not in other.targets.get(k[0], ()) 

323 } 

324 categories = set(list(self.targets.keys()) + list(other.targets.keys())) 

325 for category in categories: 

326 new.targets[category] = [] 

327 new.targets[category].extend(t for t in self.targets.get(category, []) if t not in other.targets.get(category, [])) 

328 

329 return new 

330 

331 def __eq__(self, other: object) -> bool: 

332 """Compare two targets""" 

333 if other is self: 

334 return True 

335 if other is None: 

336 return False 

337 if not isinstance(other, Target): 

338 return NotImplemented 

339 if self.target_data != other.target_data: 

340 return False 

341 if self.target_specific_data != other.target_specific_data: 

342 return False 

343 return all(self.targets.get(category, []) == other.targets.get(category, []) for category in self.CATEGORIES) 

344 

345 def as_dict(self) -> dict[str, list[str]]: 

346 return copy.deepcopy(self.targets) 

347 

348 

349class TransportConfig: 

350 def __init__(self, conf: ConfigType | None = None, class_config: "TransportConfig|None" = None) -> None: 

351 conf = conf or {} 

352 if class_config is not None: 

353 self.device_domain: list[str] = conf.get(CONF_DEVICE_DOMAIN, class_config.device_domain) 

354 self.device_discovery: bool = conf.get(CONF_DEVICE_DISCOVERY, class_config.device_discovery) 

355 self.enabled: bool = conf.get(CONF_ENABLED, class_config.enabled) 

356 self.alias = conf.get(CONF_ALIAS) 

357 self.delivery_defaults: DeliveryConfig = DeliveryConfig( 

358 conf.get(CONF_DELIVERY_DEFAULTS, {}), class_config.delivery_defaults or None 

359 ) 

360 else: 

361 self.device_domain = conf.get(CONF_DEVICE_DOMAIN, []) 

362 self.device_discovery = conf.get(CONF_DEVICE_DISCOVERY, False) 

363 self.enabled = conf.get(CONF_ENABLED, True) 

364 self.alias = conf.get(CONF_ALIAS) 

365 self.delivery_defaults = DeliveryConfig(conf.get(CONF_DELIVERY_DEFAULTS) or {}) 

366 

367 

368class DeliveryConfig: 

369 """Shared config for transport defaults and Delivery definitions""" 

370 

371 def __init__(self, conf: ConfigType, delivery_defaults: "DeliveryConfig|None" = None) -> None: 

372 if delivery_defaults is not None: 

373 # use transport defaults where no delivery level override 

374 self.target: Target | None = Target(conf.get(CONF_TARGET)) if CONF_TARGET in conf else delivery_defaults.target 

375 self.target_required: TargetRequired = conf.get(CONF_TARGET_REQUIRED, delivery_defaults.target_required) 

376 self.target_usage: str = conf.get(CONF_TARGET_USAGE) or delivery_defaults.target_usage 

377 self.action: str | None = conf.get(CONF_ACTION) or delivery_defaults.action 

378 self.debug: bool = conf.get(CONF_DEBUG, delivery_defaults.debug) 

379 

380 self.data: ConfigType = dict(delivery_defaults.data) or {} 

381 self.data.update(conf.get(CONF_DATA, {})) 

382 self.selection: list[str] = conf.get(CONF_SELECTION, delivery_defaults.selection) 

383 self.priority: list[str] = conf.get(CONF_PRIORITY, delivery_defaults.priority) 

384 self.selection_rank: SelectionRank = conf.get(CONF_SELECTION_RANK, delivery_defaults.selection_rank) 

385 self.options: ConfigType = conf.get(CONF_OPTIONS, {}) 

386 # only override options not set in config 

387 for opt in delivery_defaults.options: 

388 self.options.setdefault(opt, delivery_defaults.options[opt]) 

389 

390 else: 

391 # construct the transport defaults 

392 self.target = Target(conf.get(CONF_TARGET)) if conf.get(CONF_TARGET) else None 

393 self.target_required = conf.get(CONF_TARGET_REQUIRED, TargetRequired.ALWAYS) 

394 self.target_usage = conf.get(CONF_TARGET_USAGE, TARGET_USE_ON_NO_ACTION_TARGETS) 

395 self.action = conf.get(CONF_ACTION) 

396 self.debug = conf.get(CONF_DEBUG, False) 

397 self.options = conf.get(CONF_OPTIONS, {}) 

398 self.data = conf.get(CONF_DATA, {}) 

399 self.selection = conf.get(CONF_SELECTION, [SELECTION_DEFAULT]) 

400 self.priority = conf.get(CONF_PRIORITY, PRIORITY_VALUES) 

401 self.selection_rank = conf.get(CONF_SELECTION_RANK, SelectionRank.ANY) 

402 

403 def as_dict(self) -> dict[str, Any]: 

404 return { 

405 CONF_TARGET: self.target.as_dict() if self.target else None, 

406 CONF_ACTION: self.action, 

407 CONF_OPTIONS: self.options, 

408 CONF_DATA: self.data, 

409 CONF_SELECTION: self.selection, 

410 CONF_PRIORITY: self.priority, 

411 CONF_SELECTION_RANK: self.selection_rank, 

412 CONF_TARGET_REQUIRED: self.target_required, 

413 CONF_TARGET_USAGE: self.target_usage, 

414 } 

415 

416 def __repr__(self) -> str: 

417 """Log friendly representation""" 

418 return str(self.as_dict()) 

419 

420 

421@dataclass 

422class ConditionVariables: 

423 """Variables presented to all condition evaluations 

424 

425 Attributes 

426 ---------- 

427 applied_scenarios (list[str]): Scenarios that have been applied 

428 required_scenarios (list[str]): Scenarios that must be applied 

429 constrain_scenarios (list[str]): Only scenarios in this list, or in explicit apply_scenarios, can be applied 

430 notification_priority (str): Priority of the notification 

431 notification_message (str): Message of the notification 

432 notification_title (str): Title of the notification 

433 occupancy (list[str]): List of occupancy scenarios 

434 

435 """ 

436 

437 applied_scenarios: list[str] = field(default_factory=list) 

438 required_scenarios: list[str] = field(default_factory=list) 

439 constrain_scenarios: list[str] = field(default_factory=list) 

440 notification_priority: str = PRIORITY_MEDIUM 

441 notification_message: str = "" 

442 notification_title: str = "" 

443 occupancy: list[str] = field(default_factory=list) 

444 

445 def __init__( 

446 self, 

447 applied_scenarios: list[str] | None = None, 

448 required_scenarios: list[str] | None = None, 

449 constrain_scenarios: list[str] | None = None, 

450 delivery_priority: str | None = PRIORITY_MEDIUM, 

451 occupiers: dict[str, list[dict[str, Any]]] | None = None, 

452 message: str | None = None, 

453 title: str | None = None, 

454 ) -> None: 

455 occupiers = occupiers or {} 

456 self.occupancy = [] 

457 if not occupiers.get(STATE_NOT_HOME) and occupiers.get(STATE_HOME): 

458 self.occupancy.append("ALL_HOME") 

459 elif occupiers.get(STATE_NOT_HOME) and not occupiers.get(STATE_HOME): 

460 self.occupancy.append("ALL_AWAY") 

461 if len(occupiers.get(STATE_HOME, [])) == 1: 

462 self.occupancy.extend(["LONE_HOME", "SOME_HOME"]) 

463 elif len(occupiers.get(STATE_HOME, [])) > 1 and occupiers.get(STATE_NOT_HOME): 

464 self.occupancy.extend(["MULTI_HOME", "SOME_HOME"]) 

465 self.applied_scenarios = applied_scenarios or [] 

466 self.required_scenarios = required_scenarios or [] 

467 self.constrain_scenarios = constrain_scenarios or [] 

468 self.notification_priority = delivery_priority or PRIORITY_MEDIUM 

469 self.notification_message = message or "" 

470 self.notification_title = title or "" 

471 

472 def as_dict(self) -> ConfigType: 

473 return { 

474 "applied_scenarios": self.applied_scenarios, 

475 "required_scenarios": self.required_scenarios, 

476 "constrain_scenarios": self.constrain_scenarios, 

477 "notification_message": self.notification_message, 

478 "notification_title": self.notification_title, 

479 "occupancy": self.occupancy, 

480 } 

481 

482 

483class SuppressionReason(StrEnum): 

484 SNOOZED = "SNOOZED" 

485 DUPE = "DUPE" 

486 NO_SCENARIO = "NO_SCENARIO" 

487 

488 

489class TargetRequired(StrEnum): 

490 ALWAYS = auto() 

491 NEVER = auto() 

492 OPTIONAL = auto() 

493 

494 @classmethod 

495 def _missing_(cls, value: Any) -> "TargetRequired|None": 

496 """Backward compatibility for binary values""" 

497 if value is True or (isinstance(value, str) and value.lower() in ("true", "on")): 

498 return cls.ALWAYS 

499 if value is False or (isinstance(value, str) and value.lower() in ("false", "off")): 

500 return cls.OPTIONAL 

501 return None 

502 

503 

504class TargetType(StrEnum): 

505 pass 

506 

507 

508class GlobalTargetType(TargetType): 

509 NONCRITICAL = "NONCRITICAL" 

510 EVERYTHING = "EVERYTHING" 

511 

512 

513class RecipientType(StrEnum): 

514 USER = "USER" 

515 EVERYONE = "EVERYONE" 

516 

517 

518class QualifiedTargetType(TargetType): 

519 TRANSPORT = "TRANSPORT" 

520 DELIVERY = "DELIVERY" 

521 CAMERA = "CAMERA" 

522 PRIORITY = "PRIORITY" 

523 MOBILE = "MOBILE" 

524 

525 

526class CommandType(StrEnum): 

527 SNOOZE = "SNOOZE" 

528 SILENCE = "SILENCE" 

529 NORMAL = "NORMAL" 

530 

531 

532class MessageOnlyPolicy(StrEnum): 

533 STANDARD = "STANDARD" # independent title and message 

534 USE_TITLE = "USE_TITLE" # use title in place of message, no title 

535 # use combined title and message as message, no title 

536 COMBINE_TITLE = "COMBINE_TITLE"