Coverage for custom_components/supernotify/media_grab.py: 14%
274 statements
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2026-01-07 15:35 +0000
1import asyncio
2import datetime as dt
3import io
4import logging
5import time
6from enum import StrEnum, auto
7from http import HTTPStatus
8from io import BytesIO
9from typing import TYPE_CHECKING, Any, cast
11import aiofiles
12import aiofiles.os
13import homeassistant.util.dt as dt_util
14from aiohttp import ClientResponse, ClientSession, ClientTimeout
15from anyio import Path
16from homeassistant.const import STATE_HOME, STATE_UNAVAILABLE
17from PIL import Image
19from custom_components.supernotify import (
20 ATTR_JPEG_OPTS,
21 ATTR_MEDIA_CAMERA_DELAY,
22 ATTR_MEDIA_CAMERA_ENTITY_ID,
23 ATTR_MEDIA_CAMERA_PTZ_PRESET,
24 ATTR_MEDIA_SNAPSHOT_PATH,
25 ATTR_MEDIA_SNAPSHOT_URL,
26 ATTR_PNG_OPTS,
27 CONF_ALT_CAMERA,
28 CONF_CAMERA,
29 CONF_DEVICE_TRACKER,
30 CONF_OPTIONS,
31 CONF_PTZ_CAMERA,
32 CONF_PTZ_DELAY,
33 CONF_PTZ_METHOD,
34 CONF_PTZ_PRESET_DEFAULT,
35 MEDIA_OPTION_REPROCESS,
36 OPTION_JPEG,
37 OPTION_PNG,
38 PTZ_METHOD_FRIGATE,
39 PTZ_METHOD_ONVIF,
40)
42from .context import Context
43from .hass_api import HomeAssistantAPI
45if TYPE_CHECKING:
46 from homeassistant.components.image import ImageEntity
47 from homeassistant.core import State
49_LOGGER = logging.getLogger(__name__)
52class ReprocessOption(StrEnum):
53 ALWAYS = auto()
54 NEVER = auto()
55 PRESERVE = auto()
58async def snapshot_from_url(
59 hass_api: HomeAssistantAPI,
60 snapshot_url: str,
61 notification_id: str,
62 media_path: Path,
63 hass_base_url: str | None,
64 remote_timeout: int = 15,
65 reprocess: ReprocessOption = ReprocessOption.ALWAYS,
66 jpeg_opts: dict[str, Any] | None = None,
67 png_opts: dict[str, Any] | None = None,
68) -> Path | None:
69 hass_base_url = hass_base_url or ""
70 try:
71 media_dir: Path = Path(media_path) / "snapshot"
72 await media_dir.mkdir(parents=True, exist_ok=True)
74 if snapshot_url.startswith("http"):
75 image_url = snapshot_url
76 else:
77 image_url = f"{hass_base_url}{snapshot_url}"
78 websession: ClientSession = hass_api.http_session()
79 r: ClientResponse = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout))
80 if r.status != HTTPStatus.OK:
81 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status)
82 else:
83 bitmap: bytes | None = await r.content.read()
84 image_path: Path | None = await write_image_from_bitmap(
85 hass_api,
86 bitmap,
87 media_path,
88 notification_id,
89 reprocess=reprocess,
90 jpeg_opts=jpeg_opts,
91 png_opts=png_opts,
92 )
93 if image_path:
94 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path)
95 return Path(image_path)
97 _LOGGER.warning("SUPERNOTIFY Failed to snap image from %s", snapshot_url)
98 except Exception as e:
99 _LOGGER.exception("SUPERNOTIFY Image snap fail: %s", e)
101 return None
104async def move_camera_to_ptz_preset(
105 hass_api: HomeAssistantAPI, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF
106) -> None:
107 try:
108 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id)
109 if method == PTZ_METHOD_FRIGATE:
110 await hass_api.call_service(
111 "frigate",
112 "ptz",
113 service_data={"action": "preset", "argument": preset},
114 target={"entity_id": camera_entity_id},
115 return_response=False,
116 blocking=True,
117 )
119 elif method == PTZ_METHOD_ONVIF:
120 await hass_api.call_service(
121 "onvif",
122 "ptz",
123 service_data={"move_mode": "GotoPreset", "preset": preset},
124 target={"entity_id": camera_entity_id},
125 return_response=False,
126 blocking=True,
127 )
128 else:
129 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method)
130 except Exception as e:
131 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e)
134async def snap_image_entity(
135 hass_api: HomeAssistantAPI,
136 entity_id: str,
137 media_path: Path,
138 notification_id: str,
139 reprocess: ReprocessOption = ReprocessOption.ALWAYS,
140 jpeg_opts: dict[str, Any] | None = None,
141 png_opts: dict[str, Any] | None = None,
142) -> Path | None:
143 """Use for any image, including MQTT Image"""
144 image_path: Path | None = None
145 try:
146 image_entity: ImageEntity | None = cast("ImageEntity|None", hass_api.domain_entity("image", entity_id))
147 if image_entity:
148 bitmap: bytes | None = await image_entity.async_image()
149 image_path = await write_image_from_bitmap(
150 hass_api,
151 bitmap,
152 media_path,
153 notification_id,
154 reprocess=reprocess,
155 jpeg_opts=jpeg_opts,
156 png_opts=png_opts,
157 )
158 except Exception as e:
159 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e)
160 if image_path is None:
161 _LOGGER.warning("SUPERNOTIFY Unable to save from image entity %s", entity_id)
162 return Path(image_path) if image_path else None
165async def snap_camera(
166 hass_api: HomeAssistantAPI,
167 camera_entity_id: str,
168 notification_id: str,
169 media_path: Path,
170 max_camera_wait: int = 20,
171 reprocess: ReprocessOption = ReprocessOption.ALWAYS,
172 jpeg_opts: dict[str, Any] | None = None,
173 png_opts: dict[str, Any] | None = None,
174) -> Path | None:
175 image_path: Path | None = None
176 if not camera_entity_id:
177 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap")
178 return image_path
180 try:
181 media_dir: Path = Path(media_path) / "camera"
182 await media_dir.mkdir(parents=True, exist_ok=True)
183 timed = str(time.time()).replace(".", "_")
184 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg"
185 await hass_api.call_service(
186 "camera",
187 "snapshot",
188 service_data={"entity_id": camera_entity_id, "filename": image_path},
189 return_response=False,
190 blocking=True,
191 )
193 # give async service time
194 cutoff_time = time.time() + max_camera_wait
195 while time.time() < cutoff_time and not await image_path.exists():
196 _LOGGER.info("Image file not available yet at %s, pausing", image_path)
197 await asyncio.sleep(1)
199 if reprocess != ReprocessOption.NEVER:
200 async with await Path(image_path).open("rb") as f:
201 bitmap: bytes | None = await f.read()
202 async_path: Path | None = await write_image_from_bitmap(
203 hass_api,
204 bitmap,
205 media_path,
206 notification_id,
207 reprocess=reprocess,
208 jpeg_opts=jpeg_opts,
209 png_opts=png_opts,
210 )
211 if async_path:
212 image_path = Path(async_path)
213 else:
214 _LOGGER.warning("SUPERNOTIFY Unable to reprocess camera image")
216 except Exception as e:
217 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e)
218 image_path = None
220 return image_path
223def camera_available(hass_api: HomeAssistantAPI, camera_config: dict[str, Any], non_entity: bool = False) -> bool:
224 state: State | None = None
225 tracker_entity_id: str
226 camera_entity_id: str = camera_config[CONF_CAMERA]
227 try:
228 if camera_config.get(CONF_DEVICE_TRACKER):
229 tracker_entity_id = camera_config[CONF_DEVICE_TRACKER]
230 state = hass_api.get_state(camera_config[CONF_DEVICE_TRACKER])
231 if state and state.state == STATE_HOME:
232 return True
233 _LOGGER.debug("SUPERNOTIFY Skipping camera %s tracker %s state %s", camera_entity_id, tracker_entity_id, state)
234 else:
235 tracker_entity_id = camera_entity_id
236 state = hass_api.get_state(camera_entity_id)
237 if state and state.state != STATE_UNAVAILABLE:
238 return True
239 if state is None and non_entity:
240 return True
241 _LOGGER.debug("SUPERNOTIFY Skipping camera %s with state %s", camera_entity_id, state)
242 if state is None:
243 if tracker_entity_id == camera_entity_id:
244 _LOGGER.warning(
245 "SUPERNOTIFY Camera %s tracker %s has no entity state",
246 camera_entity_id,
247 tracker_entity_id,
248 )
249 else:
250 _LOGGER.warning(
251 "SUPERNOTIFY Camera %s device_tracker %s seems missing",
252 camera_entity_id,
253 camera_config[CONF_DEVICE_TRACKER],
254 )
255 return False
257 except Exception as e:
258 _LOGGER.exception("SUPERNOTIFY Unable to determine camera state: %s, %s", camera_config, e)
259 return False
262def select_avail_camera(hass_api: HomeAssistantAPI, cameras: dict[str, Any], camera_entity_id: str) -> str | None:
263 avail_camera_entity_id: str | None = None
265 preferred_cam = cameras.get(camera_entity_id)
266 # test support FIXME
267 if preferred_cam and CONF_CAMERA not in preferred_cam:
268 preferred_cam[CONF_CAMERA] = camera_entity_id
269 if preferred_cam is None:
270 # assume unconfigured camera available
271 return camera_entity_id
272 if camera_available(hass_api, preferred_cam):
273 return camera_entity_id
275 alt_cams: list[dict[str, Any]] = [cameras[c] for c in preferred_cam.get(CONF_ALT_CAMERA, []) if c in cameras]
276 alt_cams.extend(
277 {CONF_CAMERA: entity_id} for entity_id in preferred_cam.get(CONF_ALT_CAMERA, []) if entity_id not in cameras
278 )
279 for alt_cam in alt_cams:
280 if camera_available(hass_api, alt_cam):
281 _LOGGER.info("SUPERNOTIFY Selecting available camera %s rather than %s", alt_cam[CONF_CAMERA], camera_entity_id)
282 return alt_cam[CONF_CAMERA]
284 if avail_camera_entity_id is None:
285 _LOGGER.warning("%s not available, finding best alternative available", camera_entity_id)
286 if camera_available(hass_api, preferred_cam, non_entity=True):
287 _LOGGER.info("SUPERNOTIFY Selecting camera %s with no known entity", camera_entity_id)
288 return camera_entity_id
289 for alt_cam in alt_cams:
290 if camera_available(hass_api, alt_cam, non_entity=True):
291 _LOGGER.info(
292 "SUPERNOTIFY Selecting alt camera %s with no known entity for %s", alt_cam[CONF_CAMERA], camera_entity_id
293 )
294 return alt_cam[CONF_CAMERA]
296 return None
299async def grab_image(notification: "Notification", delivery_name: str, context: Context) -> Path | None: # type: ignore # noqa: F821
300 snapshot_url = notification.media.get(ATTR_MEDIA_SNAPSHOT_URL)
301 camera_entity_id = notification.media.get(ATTR_MEDIA_CAMERA_ENTITY_ID)
302 delivery_config = notification.delivery_data(delivery_name)
303 jpeg_opts = notification.media.get(ATTR_JPEG_OPTS, delivery_config.get(CONF_OPTIONS, {}).get(OPTION_JPEG))
304 png_opts = notification.media.get(ATTR_PNG_OPTS, delivery_config.get(CONF_OPTIONS, {}).get(OPTION_PNG))
305 reprocess_option = (
306 notification.media.get(MEDIA_OPTION_REPROCESS, delivery_config.get(CONF_OPTIONS, {}).get(MEDIA_OPTION_REPROCESS))
307 or "always"
308 )
309 media_path: Path | None = context.media_storage.media_path
310 if not media_path:
311 return None
313 reprocess: ReprocessOption = ReprocessOption.ALWAYS
314 try:
315 reprocess = ReprocessOption(reprocess_option)
316 except Exception:
317 _LOGGER.warning("SUPERNOTIFY Invalid reprocess option: %s", reprocess_option)
319 if not snapshot_url and not camera_entity_id:
320 return None
322 image_path: Path | None = None
323 if notification.media.get(ATTR_MEDIA_SNAPSHOT_PATH) is not None:
324 return notification.media.get(ATTR_MEDIA_SNAPSHOT_PATH) # type: ignore
325 if snapshot_url and media_path and context.hass_api:
326 image_path = await snapshot_from_url(
327 context.hass_api,
328 snapshot_url,
329 notification.id,
330 media_path,
331 context.hass_api.internal_url,
332 reprocess=reprocess,
333 jpeg_opts=jpeg_opts,
334 png_opts=png_opts,
335 )
336 elif camera_entity_id and camera_entity_id.startswith("image.") and context.hass_api and media_path:
337 image_path = await snap_image_entity(
338 context.hass_api,
339 camera_entity_id,
340 media_path,
341 notification.id,
342 reprocess=reprocess,
343 jpeg_opts=jpeg_opts,
344 png_opts=png_opts,
345 )
346 elif camera_entity_id:
347 if not context.hass_api or not media_path:
348 _LOGGER.warning("SUPERNOTIFY No HA ref or media path for camera %s", camera_entity_id)
349 return None
350 active_camera_entity_id = select_avail_camera(context.hass_api, context.cameras, camera_entity_id)
351 if active_camera_entity_id:
352 camera_config = context.cameras.get(active_camera_entity_id, {})
353 camera_ptz_entity_id: str = camera_config.get(CONF_PTZ_CAMERA, active_camera_entity_id)
354 camera_delay = notification.media.get(ATTR_MEDIA_CAMERA_DELAY, camera_config.get(CONF_PTZ_DELAY))
355 camera_ptz_preset_default = camera_config.get(CONF_PTZ_PRESET_DEFAULT)
356 camera_ptz_method = camera_config.get(CONF_PTZ_METHOD, PTZ_METHOD_ONVIF)
357 camera_ptz_preset = notification.media.get(ATTR_MEDIA_CAMERA_PTZ_PRESET)
358 _LOGGER.debug(
359 "SUPERNOTIFY snapping camera %s, ptz %s->%s (%s), delay %s secs",
360 active_camera_entity_id,
361 camera_ptz_preset,
362 camera_ptz_preset_default,
363 camera_ptz_entity_id,
364 camera_delay,
365 )
366 if camera_ptz_preset:
367 await move_camera_to_ptz_preset(
368 context.hass_api, camera_ptz_entity_id, camera_ptz_preset, method=camera_ptz_method
369 )
370 if camera_delay:
371 _LOGGER.debug("SUPERNOTIFY Waiting %s secs before snapping", camera_delay)
372 await asyncio.sleep(camera_delay)
373 image_path = await snap_camera(
374 context.hass_api,
375 active_camera_entity_id,
376 notification.id,
377 reprocess=reprocess,
378 media_path=media_path,
379 max_camera_wait=15,
380 jpeg_opts=jpeg_opts,
381 png_opts=png_opts,
382 )
383 if camera_ptz_preset and camera_ptz_preset_default:
384 await move_camera_to_ptz_preset(
385 context.hass_api, camera_ptz_entity_id, camera_ptz_preset_default, method=camera_ptz_method
386 )
388 if image_path is None:
389 _LOGGER.warning("SUPERNOTIFY No media available to attach (%s,%s)", snapshot_url, camera_entity_id)
390 return None
391 # TODO: replace poking inside notification
392 notification.media[ATTR_MEDIA_SNAPSHOT_PATH] = image_path
393 return image_path
396async def write_image_from_bitmap(
397 hass_api: HomeAssistantAPI,
398 bitmap: bytes | None,
399 media_path: Path,
400 notification_id: str,
401 reprocess: ReprocessOption = ReprocessOption.ALWAYS,
402 output_format: str | None = None,
403 jpeg_opts: dict[str, Any] | None = None,
404 png_opts: dict[str, Any] | None = None,
405) -> Path | None:
406 image_path: Path | None = None
407 if bitmap is None:
408 _LOGGER.debug("SUPERNOTIFY Empty bitmap for image")
409 return None
410 try:
411 media_dir: Path = Path(media_path) / "image"
412 if not await media_dir.exists():
413 await media_dir.mkdir(parents=True, exist_ok=True)
415 image = await hass_api.create_job(Image.open, io.BytesIO(bitmap))
417 input_format: str = image.format.lower() if image.format else "img"
418 if reprocess == ReprocessOption.ALWAYS:
419 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage
420 clean_image: Image.Image = Image.new(image.mode, image.size)
421 clean_image.putdata(image.getdata())
422 image = clean_image
424 buffer = BytesIO()
425 img_args = {}
426 if reprocess in (ReprocessOption.ALWAYS, ReprocessOption.PRESERVE):
427 if input_format in ("jpg", "jpeg") and jpeg_opts:
428 img_args.update(jpeg_opts)
429 elif input_format == "png" and png_opts:
430 img_args.update(png_opts)
432 output_format = output_format or input_format
433 image.save(buffer, output_format, **img_args)
435 media_ext: str = output_format if output_format else "img"
436 timed: str = str(time.time()).replace(".", "_")
437 image_path = Path(media_dir) / f"{notification_id}_{timed}.{media_ext}"
438 image_path = await image_path.resolve()
439 async with aiofiles.open(image_path, "wb") as file:
440 await file.write(buffer.getbuffer())
441 except TypeError:
442 # probably a jpeg or png option
443 _LOGGER.exception("SUPERNOTIFY Image snap fail")
444 except Exception:
445 _LOGGER.exception("SUPERNOTIFY Failure saving %s bitmap", input_format)
446 image_path = None
447 return image_path
450class MediaStorage:
451 def __init__(self, media_path: str | None, days: int = 7) -> None:
452 self.media_path: Path | None = Path(media_path) if media_path else None
453 self.last_purge: dt.datetime | None = None
454 self.purge_minute_interval = 60 * 6
455 self.days = days
457 async def initialize(self, hass_api: HomeAssistantAPI) -> None:
458 if self.media_path and not await self.media_path.exists():
459 _LOGGER.info("SUPERNOTIFY media path not found at %s", self.media_path)
460 try:
461 await self.media_path.mkdir(parents=True, exist_ok=True)
462 except Exception as e:
463 _LOGGER.warning("SUPERNOTIFY media path %s cannot be created: %s", self.media_path, e)
464 hass_api.raise_issue(
465 "media_path",
466 "media_path",
467 {"path": str(self.media_path), "error": str(e)},
468 learn_more_url="https://supernotify.rhizomatics.org.uk/#getting-started",
469 )
470 self.media_path = None
471 if self.media_path is not None:
472 _LOGGER.info("SUPERNOTIFY abs media path: %s", await self.media_path.absolute())
474 async def size(self) -> int:
475 path: Path | None = self.media_path
476 if path and await path.exists():
477 return sum(1 for p in await aiofiles.os.listdir(path))
478 return 0
480 async def cleanup(self, days: int | None = None, force: bool = False) -> int:
481 if (
482 not force
483 and self.last_purge is not None
484 and self.last_purge > dt.datetime.now(dt.UTC) - dt.timedelta(minutes=self.purge_minute_interval)
485 ):
486 return 0
487 days = days or self.days
488 if days == 0 or self.media_path is None:
489 return 0
491 cutoff = dt.datetime.now(dt.UTC) - dt.timedelta(days=days)
492 cutoff = cutoff.astimezone(dt.UTC)
493 purged = 0
494 if self.media_path and await self.media_path.exists():
495 try:
496 archive = await aiofiles.os.scandir(self.media_path)
497 for entry in archive:
498 if entry.is_file() and dt_util.utc_from_timestamp(entry.stat().st_ctime) <= cutoff:
499 _LOGGER.debug("SUPERNOTIFY Purging %s", entry.path)
500 await aiofiles.os.unlink(Path(entry.path))
501 purged += 1
502 except Exception as e:
503 _LOGGER.warning("SUPERNOTIFY Unable to clean up media storage at %s: %s", self.media_path, e, exc_info=True)
504 _LOGGER.info("SUPERNOTIFY Purged %s media storage for cutoff %s", purged, cutoff)
505 self.last_purge = dt.datetime.now(dt.UTC)
506 else:
507 _LOGGER.debug("SUPERNOTIFY Skipping media storage for unknown path %s", self.media_path)
508 return purged