Coverage for custom_components/supernotify/snoozer.py: 84%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-21 23:31 +0000

1import datetime as dt 

2import logging 

3from datetime import timedelta 

4from typing import Any 

5 

6from homeassistant.core import Event 

7from homeassistant.util import dt as dt_util 

8 

9from . import ( 

10 ATTR_ACTION, 

11 ATTR_MOBILE_APP_ID, 

12 ATTR_PERSON_ID, 

13 ATTR_USER_ID, 

14 CONF_PERSON, 

15 PRIORITY_CRITICAL, 

16 PRIORITY_MEDIUM, 

17) 

18from .delivery import Delivery 

19from .model import CommandType, GlobalTargetType, QualifiedTargetType, RecipientType, Target, TargetType 

20from .people import PeopleRegistry 

21 

22SNOOZE_TIME = timedelta(hours=1) # TODO: move to configuration 

23_LOGGER = logging.getLogger(__name__) 

24 

25 

26class Snooze: 

27 target: str | list[str] | None 

28 target_type: TargetType 

29 snoozed_at: dt.datetime 

30 snooze_until: dt.datetime | None = None 

31 recipient_type: RecipientType 

32 recipient: str | None 

33 reason: str | None = None 

34 

35 def __init__( 

36 self, 

37 target_type: TargetType, 

38 recipient_type: RecipientType, 

39 target: str | list[str] | None = None, 

40 recipient: str | None = None, 

41 snooze_for: timedelta | None = None, 

42 reason: str | None = None, 

43 ) -> None: 

44 self.snoozed_at = dt_util.now() 

45 self.target = target 

46 self.target_type = target_type 

47 self.recipient_type: RecipientType = recipient_type 

48 self.recipient = recipient 

49 self.reason = reason 

50 self.snooze_until = None 

51 if snooze_for: 

52 self.snooze_until = self.snoozed_at + snooze_for 

53 

54 def std_recipient(self) -> str | None: 

55 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE 

56 

57 def short_key(self) -> str: 

58 # only one GLOBAL can be active at a time 

59 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}" 

60 return f"{target}_{self.std_recipient()}" 

61 

62 def __eq__(self, other: object) -> bool: 

63 """Check if two snoozes for the same thing""" 

64 if not isinstance(other, Snooze): 

65 return False 

66 return self.short_key() == other.short_key() 

67 

68 def __repr__(self) -> str: 

69 """Return a string representation of the object.""" 

70 return f"Snooze({self.target_type}, {self.target}, {self.std_recipient()})" 

71 

72 def active(self) -> bool: 

73 return self.snooze_until is None or self.snooze_until > dt_util.now() 

74 

75 def export(self) -> dict[str, Any]: 

76 return { 

77 "target_type": self.target_type, 

78 "target": self.target, 

79 "recipient_type": self.recipient_type, 

80 "recipient": self.recipient, 

81 "reason": self.reason, 

82 "snoozed_at": dt_util.as_local(self.snoozed_at).strftime("%H:%M:%S") if self.snoozed_at else None, 

83 "snooze_until": dt_util.as_local(self.snooze_until).strftime("%H:%M:%S") if self.snooze_until else None, 

84 } 

85 

86 

87class Snoozer: 

88 """Manage snoozing""" 

89 

90 def __init__(self, people_registry: PeopleRegistry | None = None) -> None: 

91 self.snoozes: dict[str, Snooze] = {} 

92 self.people_registry: PeopleRegistry | None = people_registry 

93 

94 def handle_command_event(self, event: Event, people: dict[str, Any] | None = None) -> None: 

95 people = people or {} 

96 try: 

97 cmd: CommandType 

98 target_type: TargetType | None = None 

99 target: str | None = None 

100 snooze_for: timedelta = SNOOZE_TIME 

101 recipient_type: RecipientType | None = None 

102 event_name = event.data.get(ATTR_ACTION) 

103 

104 if not event_name: 

105 _LOGGER.warning( 

106 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s", 

107 event.origin, 

108 event.time_fired, 

109 event.data, 

110 event.context, 

111 ) 

112 return 

113 

114 _LOGGER.debug( 

115 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context 

116 ) 

117 event_parts: list[str] = event_name.split("_") 

118 if len(event_parts) < 4: 

119 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name) 

120 return 

121 cmd = CommandType[event_parts[1]] 

122 recipient_type = RecipientType[event_parts[2]] 

123 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4: 

124 target_type = QualifiedTargetType[event_parts[3]] 

125 target = event_parts[4] 

126 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 6 else SNOOZE_TIME 

127 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4: 

128 target_type = GlobalTargetType[event_parts[3]] 

129 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 5 else SNOOZE_TIME 

130 

131 if cmd is None or target_type is None or recipient_type is None: 

132 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name) 

133 return 

134 

135 except KeyError as ke: 

136 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke) 

137 return 

138 except Exception as e: 

139 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e) 

140 return 

141 

142 try: 

143 recipient: str | None = None 

144 if recipient_type == RecipientType.USER: 

145 target_people = [ 

146 p.get(CONF_PERSON) 

147 for p in people.values() 

148 if p.get(ATTR_USER_ID) == event.context.user_id and event.context.user_id is not None and p.get(CONF_PERSON) 

149 ] 

150 if target_people: 

151 recipient = target_people[0] 

152 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient) 

153 else: 

154 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id) 

155 return 

156 

157 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for) 

158 

159 except Exception as e: 

160 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e) 

161 

162 def register_snooze( 

163 self, 

164 cmd: CommandType, 

165 target_type: TargetType, 

166 target: str | None, 

167 recipient_type: RecipientType, 

168 recipient: str | None, 

169 snooze_for: timedelta | None, 

170 reason: str = "User command", 

171 ) -> None: 

172 if cmd == CommandType.SNOOZE: 

173 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason) 

174 self.snoozes[snooze.short_key()] = snooze 

175 elif cmd == CommandType.SILENCE: 

176 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason) 

177 self.snoozes[snooze.short_key()] = snooze 

178 elif cmd == CommandType.NORMAL: 

179 anti_snooze = Snooze(target_type, recipient_type, target, recipient) 

180 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()] 

181 for k in to_del: 

182 del self.snoozes[k] 

183 else: 

184 _LOGGER.warning( # type: ignore 

185 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)", 

186 cmd, 

187 target_type, 

188 target, 

189 recipient_type, 

190 ) 

191 

192 def purge_snoozes(self) -> None: 

193 to_del = [k for k, v in self.snoozes.items() if not v.active()] 

194 for k in to_del: 

195 del self.snoozes[k] 

196 

197 def clear(self) -> int: 

198 cleared = len(self.snoozes) 

199 self.snoozes.clear() 

200 return cleared 

201 

202 def export(self) -> list[dict[str, Any]]: 

203 return [s.export() for s in self.snoozes.values()] 

204 

205 def current_snoozes(self, priority: str, delivery: Delivery) -> list[Snooze]: 

206 inscope_snoozes: list[Snooze] = [] 

207 

208 for snooze in self.snoozes.values(): 

209 if snooze.active(): 

210 match snooze.target_type: 

211 case GlobalTargetType.EVERYTHING: 

212 inscope_snoozes.append(snooze) 

213 case GlobalTargetType.NONCRITICAL: 

214 if priority != PRIORITY_CRITICAL: 

215 inscope_snoozes.append(snooze) 

216 case QualifiedTargetType.DELIVERY: 

217 if snooze.target == delivery.name: 

218 inscope_snoozes.append(snooze) 

219 case QualifiedTargetType.PRIORITY: 

220 if snooze.target == priority: 

221 inscope_snoozes.append(snooze) 

222 case QualifiedTargetType.MOBILE: 

223 inscope_snoozes.append(snooze) 

224 case QualifiedTargetType.TRANSPORT: 

225 if snooze.target == delivery.transport.name: 

226 inscope_snoozes.append(snooze) 

227 case QualifiedTargetType.CAMERA: 

228 inscope_snoozes.append(snooze) 

229 case _: 

230 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type) 

231 

232 return inscope_snoozes 

233 

234 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool: 

235 for snooze in self.snoozes.values(): 

236 if snooze.active(): 

237 match snooze.target_type: 

238 case GlobalTargetType.EVERYTHING: 

239 return True 

240 case GlobalTargetType.NONCRITICAL: 

241 if priority != PRIORITY_CRITICAL: 

242 return True 

243 

244 return False 

245 

246 def filter_recipients(self, recipients: Target, priority: str, delivery: Delivery) -> Target: 

247 inscope_snoozes = self.current_snoozes(priority, delivery) 

248 for snooze in inscope_snoozes: 

249 if snooze.recipient_type == RecipientType.USER: 

250 # assume the everyone checks are made before notification gets this far 

251 if ( 

252 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery.name) 

253 or (snooze.target_type == QualifiedTargetType.TRANSPORT and snooze.target == delivery.transport.name) 

254 or ( 

255 snooze.target_type == QualifiedTargetType.PRIORITY 

256 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target)) 

257 ) 

258 or snooze.target_type == GlobalTargetType.EVERYTHING 

259 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL) 

260 ): 

261 recipients_to_remove = [] 

262 for recipient in recipients.person_ids: 

263 if recipient == snooze.recipient: 

264 recipients_to_remove.append(recipient) 

265 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient) 

266 

267 recipients.remove(ATTR_PERSON_ID, recipients_to_remove) 

268 

269 if snooze.target_type == QualifiedTargetType.MOBILE: 

270 to_remove: list[str] = [] 

271 for recipient in recipients.mobile_app_ids: 

272 if recipient == snooze.target: 

273 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target) 

274 to_remove.append(recipient) 

275 if to_remove: 

276 recipients.remove(ATTR_MOBILE_APP_ID, to_remove) 

277 return recipients