Coverage for custom_components/supernotify/media_grab.py: 14%

274 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-07 15:35 +0000

1import asyncio 

2import datetime as dt 

3import io 

4import logging 

5import time 

6from enum import StrEnum, auto 

7from http import HTTPStatus 

8from io import BytesIO 

9from typing import TYPE_CHECKING, Any, cast 

10 

11import aiofiles 

12import aiofiles.os 

13import homeassistant.util.dt as dt_util 

14from aiohttp import ClientResponse, ClientSession, ClientTimeout 

15from anyio import Path 

16from homeassistant.const import STATE_HOME, STATE_UNAVAILABLE 

17from PIL import Image 

18 

19from custom_components.supernotify import ( 

20 ATTR_JPEG_OPTS, 

21 ATTR_MEDIA_CAMERA_DELAY, 

22 ATTR_MEDIA_CAMERA_ENTITY_ID, 

23 ATTR_MEDIA_CAMERA_PTZ_PRESET, 

24 ATTR_MEDIA_SNAPSHOT_PATH, 

25 ATTR_MEDIA_SNAPSHOT_URL, 

26 ATTR_PNG_OPTS, 

27 CONF_ALT_CAMERA, 

28 CONF_CAMERA, 

29 CONF_DEVICE_TRACKER, 

30 CONF_OPTIONS, 

31 CONF_PTZ_CAMERA, 

32 CONF_PTZ_DELAY, 

33 CONF_PTZ_METHOD, 

34 CONF_PTZ_PRESET_DEFAULT, 

35 MEDIA_OPTION_REPROCESS, 

36 OPTION_JPEG, 

37 OPTION_PNG, 

38 PTZ_METHOD_FRIGATE, 

39 PTZ_METHOD_ONVIF, 

40) 

41 

42from .context import Context 

43from .hass_api import HomeAssistantAPI 

44 

45if TYPE_CHECKING: 

46 from homeassistant.components.image import ImageEntity 

47 from homeassistant.core import State 

48 

49_LOGGER = logging.getLogger(__name__) 

50 

51 

52class ReprocessOption(StrEnum): 

53 ALWAYS = auto() 

54 NEVER = auto() 

55 PRESERVE = auto() 

56 

57 

58async def snapshot_from_url( 

59 hass_api: HomeAssistantAPI, 

60 snapshot_url: str, 

61 notification_id: str, 

62 media_path: Path, 

63 hass_base_url: str | None, 

64 remote_timeout: int = 15, 

65 reprocess: ReprocessOption = ReprocessOption.ALWAYS, 

66 jpeg_opts: dict[str, Any] | None = None, 

67 png_opts: dict[str, Any] | None = None, 

68) -> Path | None: 

69 hass_base_url = hass_base_url or "" 

70 try: 

71 media_dir: Path = Path(media_path) / "snapshot" 

72 await media_dir.mkdir(parents=True, exist_ok=True) 

73 

74 if snapshot_url.startswith("http"): 

75 image_url = snapshot_url 

76 else: 

77 image_url = f"{hass_base_url}{snapshot_url}" 

78 websession: ClientSession = hass_api.http_session() 

79 r: ClientResponse = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout)) 

80 if r.status != HTTPStatus.OK: 

81 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status) 

82 else: 

83 bitmap: bytes | None = await r.content.read() 

84 image_path: Path | None = await write_image_from_bitmap( 

85 hass_api, 

86 bitmap, 

87 media_path, 

88 notification_id, 

89 reprocess=reprocess, 

90 jpeg_opts=jpeg_opts, 

91 png_opts=png_opts, 

92 ) 

93 if image_path: 

94 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path) 

95 return Path(image_path) 

96 

97 _LOGGER.warning("SUPERNOTIFY Failed to snap image from %s", snapshot_url) 

98 except Exception as e: 

99 _LOGGER.exception("SUPERNOTIFY Image snap fail: %s", e) 

100 

101 return None 

102 

103 

104async def move_camera_to_ptz_preset( 

105 hass_api: HomeAssistantAPI, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF 

106) -> None: 

107 try: 

108 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id) 

109 if method == PTZ_METHOD_FRIGATE: 

110 await hass_api.call_service( 

111 "frigate", 

112 "ptz", 

113 service_data={"action": "preset", "argument": preset}, 

114 target={"entity_id": camera_entity_id}, 

115 return_response=False, 

116 blocking=True, 

117 ) 

118 

119 elif method == PTZ_METHOD_ONVIF: 

120 await hass_api.call_service( 

121 "onvif", 

122 "ptz", 

123 service_data={"move_mode": "GotoPreset", "preset": preset}, 

124 target={"entity_id": camera_entity_id}, 

125 return_response=False, 

126 blocking=True, 

127 ) 

128 else: 

129 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method) 

130 except Exception as e: 

131 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e) 

132 

133 

134async def snap_image_entity( 

135 hass_api: HomeAssistantAPI, 

136 entity_id: str, 

137 media_path: Path, 

138 notification_id: str, 

139 reprocess: ReprocessOption = ReprocessOption.ALWAYS, 

140 jpeg_opts: dict[str, Any] | None = None, 

141 png_opts: dict[str, Any] | None = None, 

142) -> Path | None: 

143 """Use for any image, including MQTT Image""" 

144 image_path: Path | None = None 

145 try: 

146 image_entity: ImageEntity | None = cast("ImageEntity|None", hass_api.domain_entity("image", entity_id)) 

147 if image_entity: 

148 bitmap: bytes | None = await image_entity.async_image() 

149 image_path = await write_image_from_bitmap( 

150 hass_api, 

151 bitmap, 

152 media_path, 

153 notification_id, 

154 reprocess=reprocess, 

155 jpeg_opts=jpeg_opts, 

156 png_opts=png_opts, 

157 ) 

158 except Exception as e: 

159 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e) 

160 if image_path is None: 

161 _LOGGER.warning("SUPERNOTIFY Unable to save from image entity %s", entity_id) 

162 return Path(image_path) if image_path else None 

163 

164 

165async def snap_camera( 

166 hass_api: HomeAssistantAPI, 

167 camera_entity_id: str, 

168 notification_id: str, 

169 media_path: Path, 

170 max_camera_wait: int = 20, 

171 reprocess: ReprocessOption = ReprocessOption.ALWAYS, 

172 jpeg_opts: dict[str, Any] | None = None, 

173 png_opts: dict[str, Any] | None = None, 

174) -> Path | None: 

175 image_path: Path | None = None 

176 if not camera_entity_id: 

177 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap") 

178 return image_path 

179 

180 try: 

181 media_dir: Path = Path(media_path) / "camera" 

182 await media_dir.mkdir(parents=True, exist_ok=True) 

183 timed = str(time.time()).replace(".", "_") 

184 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg" 

185 await hass_api.call_service( 

186 "camera", 

187 "snapshot", 

188 service_data={"entity_id": camera_entity_id, "filename": image_path}, 

189 return_response=False, 

190 blocking=True, 

191 ) 

192 

193 # give async service time 

194 cutoff_time = time.time() + max_camera_wait 

195 while time.time() < cutoff_time and not await image_path.exists(): 

196 _LOGGER.info("Image file not available yet at %s, pausing", image_path) 

197 await asyncio.sleep(1) 

198 

199 if reprocess != ReprocessOption.NEVER: 

200 async with await Path(image_path).open("rb") as f: 

201 bitmap: bytes | None = await f.read() 

202 async_path: Path | None = await write_image_from_bitmap( 

203 hass_api, 

204 bitmap, 

205 media_path, 

206 notification_id, 

207 reprocess=reprocess, 

208 jpeg_opts=jpeg_opts, 

209 png_opts=png_opts, 

210 ) 

211 if async_path: 

212 image_path = Path(async_path) 

213 else: 

214 _LOGGER.warning("SUPERNOTIFY Unable to reprocess camera image") 

215 

216 except Exception as e: 

217 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e) 

218 image_path = None 

219 

220 return image_path 

221 

222 

223def camera_available(hass_api: HomeAssistantAPI, camera_config: dict[str, Any], non_entity: bool = False) -> bool: 

224 state: State | None = None 

225 tracker_entity_id: str 

226 camera_entity_id: str = camera_config[CONF_CAMERA] 

227 try: 

228 if camera_config.get(CONF_DEVICE_TRACKER): 

229 tracker_entity_id = camera_config[CONF_DEVICE_TRACKER] 

230 state = hass_api.get_state(camera_config[CONF_DEVICE_TRACKER]) 

231 if state and state.state == STATE_HOME: 

232 return True 

233 _LOGGER.debug("SUPERNOTIFY Skipping camera %s tracker %s state %s", camera_entity_id, tracker_entity_id, state) 

234 else: 

235 tracker_entity_id = camera_entity_id 

236 state = hass_api.get_state(camera_entity_id) 

237 if state and state.state != STATE_UNAVAILABLE: 

238 return True 

239 if state is None and non_entity: 

240 return True 

241 _LOGGER.debug("SUPERNOTIFY Skipping camera %s with state %s", camera_entity_id, state) 

242 if state is None: 

243 if tracker_entity_id == camera_entity_id: 

244 _LOGGER.warning( 

245 "SUPERNOTIFY Camera %s tracker %s has no entity state", 

246 camera_entity_id, 

247 tracker_entity_id, 

248 ) 

249 else: 

250 _LOGGER.warning( 

251 "SUPERNOTIFY Camera %s device_tracker %s seems missing", 

252 camera_entity_id, 

253 camera_config[CONF_DEVICE_TRACKER], 

254 ) 

255 return False 

256 

257 except Exception as e: 

258 _LOGGER.exception("SUPERNOTIFY Unable to determine camera state: %s, %s", camera_config, e) 

259 return False 

260 

261 

262def select_avail_camera(hass_api: HomeAssistantAPI, cameras: dict[str, Any], camera_entity_id: str) -> str | None: 

263 avail_camera_entity_id: str | None = None 

264 

265 preferred_cam = cameras.get(camera_entity_id) 

266 # test support FIXME 

267 if preferred_cam and CONF_CAMERA not in preferred_cam: 

268 preferred_cam[CONF_CAMERA] = camera_entity_id 

269 if preferred_cam is None: 

270 # assume unconfigured camera available 

271 return camera_entity_id 

272 if camera_available(hass_api, preferred_cam): 

273 return camera_entity_id 

274 

275 alt_cams: list[dict[str, Any]] = [cameras[c] for c in preferred_cam.get(CONF_ALT_CAMERA, []) if c in cameras] 

276 alt_cams.extend( 

277 {CONF_CAMERA: entity_id} for entity_id in preferred_cam.get(CONF_ALT_CAMERA, []) if entity_id not in cameras 

278 ) 

279 for alt_cam in alt_cams: 

280 if camera_available(hass_api, alt_cam): 

281 _LOGGER.info("SUPERNOTIFY Selecting available camera %s rather than %s", alt_cam[CONF_CAMERA], camera_entity_id) 

282 return alt_cam[CONF_CAMERA] 

283 

284 if avail_camera_entity_id is None: 

285 _LOGGER.warning("%s not available, finding best alternative available", camera_entity_id) 

286 if camera_available(hass_api, preferred_cam, non_entity=True): 

287 _LOGGER.info("SUPERNOTIFY Selecting camera %s with no known entity", camera_entity_id) 

288 return camera_entity_id 

289 for alt_cam in alt_cams: 

290 if camera_available(hass_api, alt_cam, non_entity=True): 

291 _LOGGER.info( 

292 "SUPERNOTIFY Selecting alt camera %s with no known entity for %s", alt_cam[CONF_CAMERA], camera_entity_id 

293 ) 

294 return alt_cam[CONF_CAMERA] 

295 

296 return None 

297 

298 

299async def grab_image(notification: "Notification", delivery_name: str, context: Context) -> Path | None: # type: ignore # noqa: F821 

300 snapshot_url = notification.media.get(ATTR_MEDIA_SNAPSHOT_URL) 

301 camera_entity_id = notification.media.get(ATTR_MEDIA_CAMERA_ENTITY_ID) 

302 delivery_config = notification.delivery_data(delivery_name) 

303 jpeg_opts = notification.media.get(ATTR_JPEG_OPTS, delivery_config.get(CONF_OPTIONS, {}).get(OPTION_JPEG)) 

304 png_opts = notification.media.get(ATTR_PNG_OPTS, delivery_config.get(CONF_OPTIONS, {}).get(OPTION_PNG)) 

305 reprocess_option = ( 

306 notification.media.get(MEDIA_OPTION_REPROCESS, delivery_config.get(CONF_OPTIONS, {}).get(MEDIA_OPTION_REPROCESS)) 

307 or "always" 

308 ) 

309 media_path: Path | None = context.media_storage.media_path 

310 if not media_path: 

311 return None 

312 

313 reprocess: ReprocessOption = ReprocessOption.ALWAYS 

314 try: 

315 reprocess = ReprocessOption(reprocess_option) 

316 except Exception: 

317 _LOGGER.warning("SUPERNOTIFY Invalid reprocess option: %s", reprocess_option) 

318 

319 if not snapshot_url and not camera_entity_id: 

320 return None 

321 

322 image_path: Path | None = None 

323 if notification.media.get(ATTR_MEDIA_SNAPSHOT_PATH) is not None: 

324 return notification.media.get(ATTR_MEDIA_SNAPSHOT_PATH) # type: ignore 

325 if snapshot_url and media_path and context.hass_api: 

326 image_path = await snapshot_from_url( 

327 context.hass_api, 

328 snapshot_url, 

329 notification.id, 

330 media_path, 

331 context.hass_api.internal_url, 

332 reprocess=reprocess, 

333 jpeg_opts=jpeg_opts, 

334 png_opts=png_opts, 

335 ) 

336 elif camera_entity_id and camera_entity_id.startswith("image.") and context.hass_api and media_path: 

337 image_path = await snap_image_entity( 

338 context.hass_api, 

339 camera_entity_id, 

340 media_path, 

341 notification.id, 

342 reprocess=reprocess, 

343 jpeg_opts=jpeg_opts, 

344 png_opts=png_opts, 

345 ) 

346 elif camera_entity_id: 

347 if not context.hass_api or not media_path: 

348 _LOGGER.warning("SUPERNOTIFY No HA ref or media path for camera %s", camera_entity_id) 

349 return None 

350 active_camera_entity_id = select_avail_camera(context.hass_api, context.cameras, camera_entity_id) 

351 if active_camera_entity_id: 

352 camera_config = context.cameras.get(active_camera_entity_id, {}) 

353 camera_ptz_entity_id: str = camera_config.get(CONF_PTZ_CAMERA, active_camera_entity_id) 

354 camera_delay = notification.media.get(ATTR_MEDIA_CAMERA_DELAY, camera_config.get(CONF_PTZ_DELAY)) 

355 camera_ptz_preset_default = camera_config.get(CONF_PTZ_PRESET_DEFAULT) 

356 camera_ptz_method = camera_config.get(CONF_PTZ_METHOD, PTZ_METHOD_ONVIF) 

357 camera_ptz_preset = notification.media.get(ATTR_MEDIA_CAMERA_PTZ_PRESET) 

358 _LOGGER.debug( 

359 "SUPERNOTIFY snapping camera %s, ptz %s->%s (%s), delay %s secs", 

360 active_camera_entity_id, 

361 camera_ptz_preset, 

362 camera_ptz_preset_default, 

363 camera_ptz_entity_id, 

364 camera_delay, 

365 ) 

366 if camera_ptz_preset: 

367 await move_camera_to_ptz_preset( 

368 context.hass_api, camera_ptz_entity_id, camera_ptz_preset, method=camera_ptz_method 

369 ) 

370 if camera_delay: 

371 _LOGGER.debug("SUPERNOTIFY Waiting %s secs before snapping", camera_delay) 

372 await asyncio.sleep(camera_delay) 

373 image_path = await snap_camera( 

374 context.hass_api, 

375 active_camera_entity_id, 

376 notification.id, 

377 reprocess=reprocess, 

378 media_path=media_path, 

379 max_camera_wait=15, 

380 jpeg_opts=jpeg_opts, 

381 png_opts=png_opts, 

382 ) 

383 if camera_ptz_preset and camera_ptz_preset_default: 

384 await move_camera_to_ptz_preset( 

385 context.hass_api, camera_ptz_entity_id, camera_ptz_preset_default, method=camera_ptz_method 

386 ) 

387 

388 if image_path is None: 

389 _LOGGER.warning("SUPERNOTIFY No media available to attach (%s,%s)", snapshot_url, camera_entity_id) 

390 return None 

391 # TODO: replace poking inside notification 

392 notification.media[ATTR_MEDIA_SNAPSHOT_PATH] = image_path 

393 return image_path 

394 

395 

396async def write_image_from_bitmap( 

397 hass_api: HomeAssistantAPI, 

398 bitmap: bytes | None, 

399 media_path: Path, 

400 notification_id: str, 

401 reprocess: ReprocessOption = ReprocessOption.ALWAYS, 

402 output_format: str | None = None, 

403 jpeg_opts: dict[str, Any] | None = None, 

404 png_opts: dict[str, Any] | None = None, 

405) -> Path | None: 

406 image_path: Path | None = None 

407 if bitmap is None: 

408 _LOGGER.debug("SUPERNOTIFY Empty bitmap for image") 

409 return None 

410 try: 

411 media_dir: Path = Path(media_path) / "image" 

412 if not await media_dir.exists(): 

413 await media_dir.mkdir(parents=True, exist_ok=True) 

414 

415 image = await hass_api.create_job(Image.open, io.BytesIO(bitmap)) 

416 

417 input_format: str = image.format.lower() if image.format else "img" 

418 if reprocess == ReprocessOption.ALWAYS: 

419 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage 

420 clean_image: Image.Image = Image.new(image.mode, image.size) 

421 clean_image.putdata(image.getdata()) 

422 image = clean_image 

423 

424 buffer = BytesIO() 

425 img_args = {} 

426 if reprocess in (ReprocessOption.ALWAYS, ReprocessOption.PRESERVE): 

427 if input_format in ("jpg", "jpeg") and jpeg_opts: 

428 img_args.update(jpeg_opts) 

429 elif input_format == "png" and png_opts: 

430 img_args.update(png_opts) 

431 

432 output_format = output_format or input_format 

433 image.save(buffer, output_format, **img_args) 

434 

435 media_ext: str = output_format if output_format else "img" 

436 timed: str = str(time.time()).replace(".", "_") 

437 image_path = Path(media_dir) / f"{notification_id}_{timed}.{media_ext}" 

438 image_path = await image_path.resolve() 

439 async with aiofiles.open(image_path, "wb") as file: 

440 await file.write(buffer.getbuffer()) 

441 except TypeError: 

442 # probably a jpeg or png option 

443 _LOGGER.exception("SUPERNOTIFY Image snap fail") 

444 except Exception: 

445 _LOGGER.exception("SUPERNOTIFY Failure saving %s bitmap", input_format) 

446 image_path = None 

447 return image_path 

448 

449 

450class MediaStorage: 

451 def __init__(self, media_path: str | None, days: int = 7) -> None: 

452 self.media_path: Path | None = Path(media_path) if media_path else None 

453 self.last_purge: dt.datetime | None = None 

454 self.purge_minute_interval = 60 * 6 

455 self.days = days 

456 

457 async def initialize(self, hass_api: HomeAssistantAPI) -> None: 

458 if self.media_path and not await self.media_path.exists(): 

459 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path) 

460 try: 

461 await self.media_path.mkdir(parents=True, exist_ok=True) 

462 except Exception as e: 

463 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e) 

464 hass_api.raise_issue( 

465 "media_path", 

466 "media_path", 

467 {"path": str(self.media_path), "error": str(e)}, 

468 learn_more_url="https://supernotify.rhizomatics.org.uk/#getting-started", 

469 ) 

470 self.media_path = None 

471 if self.media_path is not None: 

472 _LOGGER.info("SUPERNOTIFY abs media path: %s", await self.media_path.absolute()) 

473 

474 async def size(self) -> int: 

475 path: Path | None = self.media_path 

476 if path and await path.exists(): 

477 return sum(1 for p in await aiofiles.os.listdir(path)) 

478 return 0 

479 

480 async def cleanup(self, days: int | None = None, force: bool = False) -> int: 

481 if ( 

482 not force 

483 and self.last_purge is not None 

484 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval) 

485 ): 

486 return 0 

487 days = days or self.days 

488 if days == 0 or self.media_path is None: 

489 return 0 

490 

491 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days) 

492 cutoff = cutoff.astimezone(dt.UTC) 

493 purged = 0 

494 if self.media_path and await self.media_path.exists(): 

495 try: 

496 archive = await aiofiles.os.scandir(self.media_path) 

497 for entry in archive: 

498 if entry.is_file() and dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff: 

499 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path) 

500 await aiofiles.os.unlink(Path(entry.path)) 

501 purged += 1 

502 except Exception as e: 

503 _LOGGER.warning("SUPERNOTIFY Unable to clean up media storage at %s: %s", self.media_path, e, exc_info=True) 

504 _LOGGER.info("SUPERNOTIFY Purged %s media storage for cutoff %s", purged, cutoff) 

505 self.last_purge = dt.datetime.now(dt.UTC) 

506 else: 

507 _LOGGER.debug("SUPERNOTIFY Skipping media storage for unknown path %s", self.media_path) 

508 return purged