Coverage for custom_components/supernotify/snoozer.py: 21%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-07 15:35 +0000

1import datetime as dt 

2import logging 

3from datetime import timedelta 

4from typing import Any 

5 

6from homeassistant.core import Event 

7from homeassistant.util import dt as dt_util 

8 

9from . import ( 

10 ATTR_ACTION, 

11 ATTR_MOBILE_APP_ID, 

12 ATTR_PERSON_ID, 

13 PRIORITY_CRITICAL, 

14 PRIORITY_MEDIUM, 

15) 

16from .delivery import Delivery 

17from .model import CommandType, GlobalTargetType, QualifiedTargetType, RecipientType, Target, TargetType 

18from .people import PeopleRegistry, Recipient 

19 

20SNOOZE_TIME = timedelta(hours=1) # TODO: move to configuration 

21_LOGGER = logging.getLogger(__name__) 

22 

23 

24class Snooze: 

25 target: str | list[str] | None 

26 target_type: TargetType 

27 snoozed_at: dt.datetime 

28 snooze_until: dt.datetime | None = None 

29 recipient_type: RecipientType 

30 recipient: str | None 

31 reason: str | None = None 

32 

33 def __init__( 

34 self, 

35 target_type: TargetType, 

36 recipient_type: RecipientType, 

37 target: str | list[str] | None = None, 

38 recipient: str | None = None, 

39 snooze_for: timedelta | None = None, 

40 reason: str | None = None, 

41 ) -> None: 

42 self.snoozed_at = dt_util.now() 

43 self.target = target 

44 self.target_type = target_type 

45 self.recipient_type: RecipientType = recipient_type 

46 self.recipient = recipient 

47 self.reason = reason 

48 self.snooze_until = None 

49 if snooze_for: 

50 self.snooze_until = self.snoozed_at + snooze_for 

51 

52 def std_recipient(self) -> str | None: 

53 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE 

54 

55 def short_key(self) -> str: 

56 # only one GLOBAL can be active at a time 

57 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}" 

58 return f"{target}_{self.std_recipient()}" 

59 

60 def __eq__(self, other: object) -> bool: 

61 """Check if two snoozes for the same thing""" 

62 if not isinstance(other, Snooze): 

63 return False 

64 return self.short_key() == other.short_key() 

65 

66 def __repr__(self) -> str: 

67 """Return a string representation of the object.""" 

68 return f"Snooze({self.target_type}, {self.target}, {self.std_recipient()})" 

69 

70 def active(self) -> bool: 

71 return self.snooze_until is None or self.snooze_until > dt_util.now() 

72 

73 def export(self) -> dict[str, Any]: 

74 return { 

75 "target_type": self.target_type, 

76 "target": self.target, 

77 "recipient_type": self.recipient_type, 

78 "recipient": self.recipient, 

79 "reason": self.reason, 

80 "snoozed_at": dt_util.as_local(self.snoozed_at).strftime("%H:%M:%S") if self.snoozed_at else None, 

81 "snooze_until": dt_util.as_local(self.snooze_until).strftime("%H:%M:%S") if self.snooze_until else None, 

82 } 

83 

84 

85class Snoozer: 

86 """Manage snoozing""" 

87 

88 def __init__(self, people_registry: PeopleRegistry | None = None) -> None: 

89 self.snoozes: dict[str, Snooze] = {} 

90 self.people_registry: PeopleRegistry | None = people_registry 

91 

92 def handle_command_event(self, event: Event, people: list[Recipient] | None = None) -> None: 

93 people = people or [] 

94 try: 

95 cmd: CommandType 

96 target_type: TargetType | None = None 

97 target: str | None = None 

98 snooze_for: timedelta = SNOOZE_TIME 

99 recipient_type: RecipientType | None = None 

100 event_name = event.data.get(ATTR_ACTION) 

101 

102 if not event_name: 

103 _LOGGER.warning( 

104 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s", 

105 event.origin, 

106 event.time_fired, 

107 event.data, 

108 event.context, 

109 ) 

110 return 

111 

112 _LOGGER.debug( 

113 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context 

114 ) 

115 event_parts: list[str] = event_name.split("_") 

116 if len(event_parts) < 4: 

117 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name) 

118 return 

119 cmd = CommandType[event_parts[1]] 

120 recipient_type = RecipientType[event_parts[2]] 

121 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4: 

122 target_type = QualifiedTargetType[event_parts[3]] 

123 target = event_parts[4] 

124 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 6 else SNOOZE_TIME 

125 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4: 

126 target_type = GlobalTargetType[event_parts[3]] 

127 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 5 else SNOOZE_TIME 

128 

129 if cmd is None or target_type is None or recipient_type is None: 

130 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name) 

131 return 

132 

133 except KeyError as ke: 

134 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke) 

135 return 

136 except Exception as e: 

137 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e) 

138 return 

139 

140 try: 

141 recipient: str | None = None 

142 if recipient_type == RecipientType.USER: 

143 target_people = [ 

144 p.entity_id 

145 for p in people 

146 if p.user_id == event.context.user_id and event.context.user_id is not None and p.entity_id 

147 ] 

148 if target_people: 

149 recipient = target_people[0] 

150 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient) 

151 else: 

152 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id) 

153 return 

154 

155 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for) 

156 

157 except Exception as e: 

158 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e) 

159 

160 def register_snooze( 

161 self, 

162 cmd: CommandType, 

163 target_type: TargetType, 

164 target: str | None, 

165 recipient_type: RecipientType, 

166 recipient: str | None, 

167 snooze_for: timedelta | None, 

168 reason: str = "User command", 

169 ) -> None: 

170 if cmd == CommandType.SNOOZE: 

171 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason) 

172 self.snoozes[snooze.short_key()] = snooze 

173 elif cmd == CommandType.SILENCE: 

174 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason) 

175 self.snoozes[snooze.short_key()] = snooze 

176 elif cmd == CommandType.NORMAL: 

177 anti_snooze = Snooze(target_type, recipient_type, target, recipient) 

178 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()] 

179 for k in to_del: 

180 del self.snoozes[k] 

181 else: 

182 _LOGGER.warning( # type: ignore 

183 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)", 

184 cmd, 

185 target_type, 

186 target, 

187 recipient_type, 

188 ) 

189 

190 def purge_snoozes(self) -> None: 

191 to_del = [k for k, v in self.snoozes.items() if not v.active()] 

192 for k in to_del: 

193 del self.snoozes[k] 

194 

195 def clear(self) -> int: 

196 cleared = len(self.snoozes) 

197 self.snoozes.clear() 

198 return cleared 

199 

200 def export(self) -> list[dict[str, Any]]: 

201 return [s.export() for s in self.snoozes.values()] 

202 

203 def current_snoozes(self, priority: str, delivery: Delivery) -> list[Snooze]: 

204 inscope_snoozes: list[Snooze] = [] 

205 

206 for snooze in self.snoozes.values(): 

207 if snooze.active(): 

208 match snooze.target_type: 

209 case GlobalTargetType.EVERYTHING: 

210 inscope_snoozes.append(snooze) 

211 case GlobalTargetType.NONCRITICAL: 

212 if priority != PRIORITY_CRITICAL: 

213 inscope_snoozes.append(snooze) 

214 case QualifiedTargetType.DELIVERY: 

215 if snooze.target == delivery.name: 

216 inscope_snoozes.append(snooze) 

217 case QualifiedTargetType.PRIORITY: 

218 if snooze.target == priority: 

219 inscope_snoozes.append(snooze) 

220 case QualifiedTargetType.MOBILE: 

221 inscope_snoozes.append(snooze) 

222 case QualifiedTargetType.TRANSPORT: 

223 if snooze.target == delivery.transport.name: 

224 inscope_snoozes.append(snooze) 

225 case QualifiedTargetType.CAMERA: 

226 inscope_snoozes.append(snooze) 

227 case _: 

228 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type) 

229 

230 return inscope_snoozes 

231 

232 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool: 

233 for snooze in self.snoozes.values(): 

234 if snooze.active(): 

235 match snooze.target_type: 

236 case GlobalTargetType.EVERYTHING: 

237 return True 

238 case GlobalTargetType.NONCRITICAL: 

239 if priority != PRIORITY_CRITICAL: 

240 return True 

241 

242 return False 

243 

244 def filter_recipients(self, recipients: Target, priority: str, delivery: Delivery) -> Target: 

245 inscope_snoozes = self.current_snoozes(priority, delivery) 

246 for snooze in inscope_snoozes: 

247 if snooze.recipient_type == RecipientType.USER: 

248 # assume the everyone checks are made before notification gets this far 

249 if ( 

250 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery.name) 

251 or (snooze.target_type == QualifiedTargetType.TRANSPORT and snooze.target == delivery.transport.name) 

252 or ( 

253 snooze.target_type == QualifiedTargetType.PRIORITY 

254 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target)) 

255 ) 

256 or snooze.target_type == GlobalTargetType.EVERYTHING 

257 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL) 

258 ): 

259 recipients_to_remove = [] 

260 for recipient in recipients.person_ids: 

261 if recipient == snooze.recipient: 

262 recipients_to_remove.append(recipient) 

263 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient) 

264 

265 recipients.remove(ATTR_PERSON_ID, recipients_to_remove) 

266 

267 if snooze.target_type == QualifiedTargetType.MOBILE: 

268 to_remove: list[str] = [] 

269 for recipient in recipients.mobile_app_ids: 

270 if recipient == snooze.target: 

271 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target) 

272 to_remove.append(recipient) 

273 if to_remove: 

274 recipients.remove(ATTR_MOBILE_APP_ID, to_remove) 

275 return recipients