Coverage for custom_components/supernotify/snoozer.py: 95%

168 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-01 15:06 +0000

1import datetime as dt 

2import logging 

3from datetime import timedelta 

4from typing import TYPE_CHECKING, Any 

5 

6from homeassistant.util import dt as dt_util 

7 

8from .const import ATTR_ACTION, ATTR_MOBILE_APP_ID, ATTR_PERSON_ID, CONF_SNOOZE_TIME, PRIORITY_CRITICAL, PRIORITY_MEDIUM 

9from .model import CommandType, GlobalTargetType, QualifiedTargetType, RecipientType, Target, TargetType 

10 

11if TYPE_CHECKING: 

12 from homeassistant.core import Event 

13 

14 from .delivery import Delivery 

15 from .people import PeopleRegistry, Recipient 

16 

17_LOGGER = logging.getLogger(__name__) 

18 

19 

20class Snooze: 

21 target: str | list[str] | None 

22 target_type: TargetType 

23 snoozed_at: dt.datetime 

24 snooze_until: dt.datetime | None = None 

25 recipient_type: RecipientType 

26 recipient: str | None 

27 reason: str | None = None 

28 

29 def __init__( 

30 self, 

31 target_type: TargetType, 

32 recipient_type: RecipientType, 

33 target: str | list[str] | None = None, 

34 recipient: str | None = None, 

35 snooze_for: timedelta | None = None, 

36 reason: str | None = None, 

37 ) -> None: 

38 self.snoozed_at = dt_util.now() 

39 self.target = target 

40 self.target_type = target_type 

41 self.recipient_type: RecipientType = recipient_type 

42 self.recipient = recipient 

43 self.reason = reason 

44 self.snooze_until = None 

45 if snooze_for: 

46 self.snooze_until = self.snoozed_at + snooze_for 

47 

48 def std_recipient(self) -> str | None: 

49 return self.recipient if self.recipient_type == RecipientType.USER else RecipientType.EVERYONE 

50 

51 def short_key(self) -> str: 

52 # only one GLOBAL can be active at a time 

53 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}" 

54 return f"{target}_{self.std_recipient()}" 

55 

56 def __eq__(self, other: object) -> bool: 

57 """Check if two snoozes for the same thing""" 

58 if not isinstance(other, Snooze): 

59 return False 

60 return self.short_key() == other.short_key() 

61 

62 def __repr__(self) -> str: 

63 """Return a string representation of the object.""" 

64 target = "GLOBAL" if self.target_type in GlobalTargetType else f"{self.target_type}_{self.target}" 

65 return f"Snooze({target}, {self.std_recipient()})" 

66 

67 def active(self) -> bool: 

68 return self.snooze_until is None or self.snooze_until > dt_util.now() 

69 

70 def export(self) -> dict[str, Any]: 

71 return { 

72 "target_type": self.target_type, 

73 "target": self.target, 

74 "recipient_type": self.recipient_type, 

75 "recipient": self.recipient, 

76 "reason": self.reason, 

77 "snoozed_at": dt_util.as_local(self.snoozed_at).strftime("%H:%M:%S") if self.snoozed_at else None, 

78 "snooze_until": dt_util.as_local(self.snooze_until).strftime("%H:%M:%S") if self.snooze_until else None, 

79 } 

80 

81 

82class Snoozer: 

83 """Manage snoozing""" 

84 

85 def __init__(self, config: dict[str, Any] | None = None, people_registry: PeopleRegistry | None = None) -> None: 

86 self.snoozes: dict[str, Snooze] = {} 

87 self.people_registry: PeopleRegistry | None = people_registry 

88 self.config = config or {} 

89 self.snooze_period = timedelta(seconds=self.config.get(CONF_SNOOZE_TIME, 60 * 60)) 

90 

91 def handle_command_event(self, event: Event, people: list[Recipient] | None = None) -> None: 

92 people = people or [] 

93 try: 

94 cmd: CommandType 

95 target_type: TargetType | None = None 

96 target: str | None = None 

97 snooze_for: timedelta = self.snooze_period 

98 recipient_type: RecipientType | None = None 

99 event_name = event.data.get(ATTR_ACTION) 

100 

101 if not event_name: 

102 _LOGGER.warning( 

103 "SUPERNOTIFY Invalid Mobile Action: %s, %s, %s, %s", 

104 event.origin, 

105 event.time_fired, 

106 event.data, 

107 event.context, 

108 ) 

109 return 

110 

111 _LOGGER.debug( 

112 "SUPERNOTIFY Mobile Action: %s, %s, %s, %s", event.origin, event.time_fired, event.data, event.context 

113 ) 

114 event_parts: list[str] = event_name.split("_") 

115 if len(event_parts) < 4: 

116 _LOGGER.warning("SUPERNOTIFY Malformed mobile event action %s", event_name) 

117 return 

118 cmd = CommandType[event_parts[1]] 

119 recipient_type = RecipientType[event_parts[2]] 

120 if event_parts[3] in QualifiedTargetType and len(event_parts) > 4: 

121 target_type = QualifiedTargetType[event_parts[3]] 

122 target = event_parts[4] 

123 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 6 else self.snooze_period 

124 elif event_parts[3] in GlobalTargetType and len(event_parts) >= 4: 

125 target_type = GlobalTargetType[event_parts[3]] 

126 snooze_for = timedelta(minutes=int(event_parts[-1])) if len(event_parts) == 5 else self.snooze_period 

127 

128 if cmd is None or target_type is None or recipient_type is None: 

129 _LOGGER.warning("SUPERNOTIFY Invalid mobile event name %s", event_name) 

130 return 

131 

132 except KeyError as ke: 

133 _LOGGER.warning("SUPERNOTIFY Unknown enum in event %s: %s", event, ke) 

134 return 

135 except Exception as e: 

136 _LOGGER.warning("SUPERNOTIFY Unable to analyze event %s: %s", event, e) 

137 return 

138 

139 try: 

140 recipient: str | None = None 

141 if recipient_type == RecipientType.USER: 

142 target_people = [ 

143 p.entity_id 

144 for p in people 

145 if p.user_id == event.context.user_id and event.context.user_id is not None and p.entity_id 

146 ] 

147 if target_people: 

148 recipient = target_people[0] 

149 _LOGGER.debug("SUPERNOTIFY mobile action from %s mapped to %s", event.context.user_id, recipient) 

150 else: 

151 _LOGGER.warning("SUPERNOTIFY Unable to find person for action from %s", event.context.user_id) 

152 return 

153 

154 self.register_snooze(cmd, target_type, target, recipient_type, recipient, snooze_for) 

155 

156 except Exception as e: 

157 _LOGGER.warning("SUPERNOTIFY Unable to handle event %s: %s", event, e) 

158 

159 def register_snooze( 

160 self, 

161 cmd: CommandType, 

162 target_type: TargetType, 

163 target: str | None, 

164 recipient_type: RecipientType, 

165 recipient: str | None, 

166 snooze_for: timedelta | None, 

167 reason: str = "User command", 

168 ) -> None: 

169 if cmd == CommandType.SNOOZE: 

170 snooze = Snooze(target_type, recipient_type, target, recipient, snooze_for, reason=reason) 

171 self.snoozes[snooze.short_key()] = snooze 

172 elif cmd == CommandType.SILENCE: 

173 snooze = Snooze(target_type, recipient_type, target, recipient, reason=reason) 

174 self.snoozes[snooze.short_key()] = snooze 

175 elif cmd == CommandType.NORMAL: 

176 anti_snooze = Snooze(target_type, recipient_type, target, recipient) 

177 to_del = [k for k, v in self.snoozes.items() if v.short_key() == anti_snooze.short_key()] 

178 for k in to_del: 

179 del self.snoozes[k] 

180 else: 

181 _LOGGER.warning( # type: ignore 

182 "SUPERNOTIFY Invalid mobile cmd %s (target_type: %s, target: %s, recipient_type: %s)", 

183 cmd, 

184 target_type, 

185 target, 

186 recipient_type, 

187 ) 

188 

189 def purge_snoozes(self) -> None: 

190 to_del = [k for k, v in self.snoozes.items() if not v.active()] 

191 for k in to_del: 

192 del self.snoozes[k] 

193 

194 def clear(self) -> int: 

195 cleared = len(self.snoozes) 

196 self.snoozes.clear() 

197 return cleared 

198 

199 def export(self) -> list[dict[str, Any]]: 

200 return [s.export() for s in self.snoozes.values()] 

201 

202 def current_snoozes(self, priority: str, delivery: Delivery) -> list[Snooze]: 

203 inscope_snoozes: list[Snooze] = [] 

204 

205 for snooze in self.snoozes.values(): 

206 if snooze.active(): 

207 match snooze.target_type: 

208 case GlobalTargetType.EVERYTHING: 

209 inscope_snoozes.append(snooze) 

210 case GlobalTargetType.NONCRITICAL: 

211 if priority != PRIORITY_CRITICAL: 

212 inscope_snoozes.append(snooze) 

213 case QualifiedTargetType.DELIVERY: 

214 if snooze.target == delivery.name: 

215 inscope_snoozes.append(snooze) 

216 case QualifiedTargetType.PRIORITY: 

217 if snooze.target == priority: 

218 inscope_snoozes.append(snooze) 

219 case QualifiedTargetType.MOBILE: 

220 inscope_snoozes.append(snooze) 

221 case QualifiedTargetType.TRANSPORT: 

222 if snooze.target == delivery.transport.name: 

223 inscope_snoozes.append(snooze) 

224 case QualifiedTargetType.CAMERA: 

225 inscope_snoozes.append(snooze) 

226 case _: 

227 _LOGGER.warning("SUPERNOTIFY Unhandled target type %s", snooze.target_type) 

228 

229 return inscope_snoozes 

230 

231 def is_global_snooze(self, priority: str = PRIORITY_MEDIUM) -> bool: 

232 for snooze in self.snoozes.values(): 

233 if snooze.active(): 

234 match snooze.target_type: 

235 case GlobalTargetType.EVERYTHING: 

236 return True 

237 case GlobalTargetType.NONCRITICAL: 

238 if priority != PRIORITY_CRITICAL: 

239 return True 

240 

241 return False 

242 

243 def filter_recipients(self, recipients: Target, priority: str, delivery: Delivery) -> Target: 

244 inscope_snoozes = self.current_snoozes(priority, delivery) 

245 for snooze in inscope_snoozes: 

246 if snooze.recipient_type == RecipientType.USER: 

247 # assume the everyone checks are made before notification gets this far 

248 if ( 

249 (snooze.target_type == QualifiedTargetType.DELIVERY and snooze.target == delivery.name) 

250 or (snooze.target_type == QualifiedTargetType.TRANSPORT and snooze.target == delivery.transport.name) 

251 or ( 

252 snooze.target_type == QualifiedTargetType.PRIORITY 

253 and (snooze.target == priority or (isinstance(snooze.target, list) and priority in snooze.target)) 

254 ) 

255 or snooze.target_type == GlobalTargetType.EVERYTHING 

256 or (snooze.target_type == GlobalTargetType.NONCRITICAL and priority != PRIORITY_CRITICAL) 

257 ): 

258 recipients_to_remove = [] 

259 for recipient in recipients.person_ids: 

260 if recipient == snooze.recipient: 

261 recipients_to_remove.append(recipient) 

262 _LOGGER.info("SUPERNOTIFY Snoozing %s", snooze.recipient) 

263 

264 recipients.remove(ATTR_PERSON_ID, recipients_to_remove) 

265 

266 if snooze.target_type == QualifiedTargetType.MOBILE: 

267 to_remove: list[str] = [] 

268 for recipient in recipients.mobile_app_ids: 

269 if recipient == snooze.target: 

270 _LOGGER.debug("SUPERNOTIFY Snoozing %s for %s", snooze.recipient, snooze.target) 

271 to_remove.append(recipient) 

272 if to_remove: 

273 recipients.remove(ATTR_MOBILE_APP_ID, to_remove) 

274 return recipients