Coverage for custom_components/supernotify/media_grab.py: 83%
186 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-21 23:31 +0000
1import asyncio
2import io
3import logging
4import time
5from http import HTTPStatus
6from io import BytesIO
7from pathlib import Path
8from typing import TYPE_CHECKING, Any
10import aiofiles
11import anyio
12from aiohttp import ClientTimeout
13from homeassistant.const import STATE_HOME
14from homeassistant.core import HomeAssistant
15from homeassistant.helpers.aiohttp_client import async_get_clientsession
16from PIL import Image
18from custom_components.supernotify import (
19 ATTR_JPEG_OPTS,
20 ATTR_MEDIA_CAMERA_DELAY,
21 ATTR_MEDIA_CAMERA_ENTITY_ID,
22 ATTR_MEDIA_CAMERA_PTZ_PRESET,
23 ATTR_MEDIA_SNAPSHOT_URL,
24 CONF_ALT_CAMERA,
25 CONF_CAMERA,
26 CONF_DEVICE_TRACKER,
27 CONF_OPTIONS,
28 CONF_PTZ_DELAY,
29 CONF_PTZ_METHOD,
30 CONF_PTZ_PRESET_DEFAULT,
31 OPTION_JPEG,
32 PTZ_METHOD_FRIGATE,
33 PTZ_METHOD_ONVIF,
34)
36from .context import Context
38if TYPE_CHECKING:
39 from homeassistant.components.image import ImageEntity
41_LOGGER = logging.getLogger(__name__)
44async def snapshot_from_url(
45 hass: HomeAssistant | None,
46 snapshot_url: str,
47 notification_id: str,
48 media_path: Path,
49 hass_base_url: str | None,
50 remote_timeout: int = 15,
51 jpeg_opts: dict[str, Any] | None = None,
52) -> Path | None:
53 hass_base_url = hass_base_url or ""
54 if not hass:
55 raise ValueError("HomeAssistant not available")
56 try:
57 media_dir: anyio.Path = anyio.Path(media_path) / "snapshot"
58 await media_dir.mkdir(parents=True, exist_ok=True)
60 if snapshot_url.startswith("http"):
61 image_url = snapshot_url
62 else:
63 image_url = f"{hass_base_url}{snapshot_url}"
64 websession = async_get_clientsession(hass)
65 r = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout))
66 if r.status != HTTPStatus.OK:
67 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status)
68 else:
69 if r.content_type in ("image/jpeg", "image/jpg"):
70 media_ext = "jpg"
71 image_format = "JPEG"
72 elif r.content_type == "image/png":
73 media_ext = "png"
74 image_format = "PNG"
75 elif r.content_type == "image/gif":
76 media_ext = "gif"
77 image_format = "GIF"
78 else:
79 _LOGGER.info("SUPERNOTIFY Unexpected MIME type %s from snap of %s", r.content_type, image_url)
80 media_ext = "img"
81 image_format = None
83 # TODO: configure image rewrite
84 image_path: Path = Path(media_dir) / f"{notification_id}.{media_ext}"
85 image: Image.Image = Image.open(io.BytesIO(await r.content.read()))
86 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage
87 clean_image: Image.Image = Image.new(image.mode, image.size)
88 clean_image.putdata(image.getdata())
89 buffer = BytesIO()
90 img_args = {}
91 if image_format == "JPEG" and jpeg_opts:
92 img_args.update(jpeg_opts)
93 clean_image.save(buffer, image_format, **img_args)
94 async with aiofiles.open(image_path, "wb") as file:
95 await file.write(buffer.getbuffer())
96 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path)
97 return image_path
98 except Exception as e:
99 _LOGGER.error("SUPERNOTIFY Image snap fail: %s", e)
100 return None
103async def move_camera_to_ptz_preset(
104 hass: HomeAssistant, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF
105) -> None:
106 try:
107 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id)
108 if method == PTZ_METHOD_FRIGATE:
109 await hass.services.async_call(
110 "frigate",
111 "ptz",
112 service_data={"action": "preset", "argument": preset},
113 target={
114 "entity_id": camera_entity_id,
115 },
116 )
117 elif method == PTZ_METHOD_ONVIF:
118 await hass.services.async_call(
119 "onvif",
120 "ptz",
121 service_data={"move_mode": "GotoPreset", "preset": preset},
122 target={
123 "entity_id": camera_entity_id,
124 },
125 )
126 else:
127 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method)
128 except Exception as e:
129 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e)
132async def snap_image(
133 context: Context,
134 entity_id: str,
135 media_path: Path,
136 notification_id: str,
137 jpeg_opts: dict[str, Any] | None = None,
138) -> Path | None:
139 """Use for any image, including MQTT Image"""
140 image_path: anyio.Path | None = None
141 try:
142 image_entity: ImageEntity | None = None
143 if context.hass_api._hass:
144 # TODO: must be a better hass method than this
145 image_entity = context.hass_api._hass.data["image"].get_entity(entity_id)
146 if image_entity:
147 bitmap: bytes | None = await image_entity.async_image()
148 if bitmap is None:
149 _LOGGER.warning("SUPERNOTIFY Empty bitmap from image entity %s", entity_id)
150 else:
151 image: Image.Image = Image.open(io.BytesIO(bitmap))
152 media_dir: anyio.Path = anyio.Path(media_path) / "image"
153 await media_dir.mkdir(parents=True, exist_ok=True)
155 media_ext: str = image.format.lower() if image.format else "img"
156 timed: str = str(time.time()).replace(".", "_")
157 image_path = anyio.Path(media_dir) / f"{notification_id}_{timed}.{media_ext}"
158 buffer = BytesIO()
159 img_args = {}
160 if media_ext in ("jpg", "jpeg") and jpeg_opts:
161 img_args.update(jpeg_opts)
162 image.save(buffer, image.format, **img_args)
163 async with aiofiles.open(await image_path.resolve(), "wb") as file:
164 await file.write(buffer.getbuffer())
165 else:
166 _LOGGER.warning("SUPERNOTIFY Unable to find image entity %s", entity_id)
167 except Exception as e:
168 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e)
169 return None
170 return Path(await image_path.resolve()) if image_path else None
173async def snap_camera(
174 hass: HomeAssistant,
175 camera_entity_id: str,
176 media_path: Path,
177 max_camera_wait: int = 20,
178 jpeg_opts: dict[str, Any] | None = None,
179) -> Path | None:
180 image_path: Path | None = None
181 if not camera_entity_id:
182 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap")
183 return image_path
184 if jpeg_opts:
185 _LOGGER.warning("jpeg_opts not yet supported by snap_camera")
187 try:
188 media_dir: anyio.Path = anyio.Path(media_path) / "camera"
189 await media_dir.mkdir(parents=True, exist_ok=True)
190 timed = str(time.time()).replace(".", "_")
191 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg"
192 await hass.services.async_call(
193 "camera", "snapshot", service_data={"entity_id": camera_entity_id, "filename": image_path}
194 )
196 # give async service time
197 cutoff_time = time.time() + max_camera_wait
198 while time.time() < cutoff_time and not image_path.exists():
199 _LOGGER.info("Image file not available yet at %s, pausing", image_path)
200 await asyncio.sleep(1)
202 except Exception as e:
203 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e)
204 image_path = None
206 return image_path
209def select_avail_camera(hass: HomeAssistant, cameras: dict[str, Any], camera_entity_id: str) -> str | None:
210 avail_camera_entity_id: str | None = None
212 try:
213 preferred_cam = cameras.get(camera_entity_id)
215 if not preferred_cam or not preferred_cam.get(CONF_DEVICE_TRACKER):
216 # assume unconfigured camera, or configured without tracker, available
217 avail_camera_entity_id = camera_entity_id
218 elif hass.states.is_state(preferred_cam[CONF_DEVICE_TRACKER], STATE_HOME):
219 avail_camera_entity_id = camera_entity_id
220 else:
221 alt_cams_with_tracker = [
222 cameras[c]
223 for c in preferred_cam.get(CONF_ALT_CAMERA, [])
224 if c in cameras and cameras[c].get(CONF_DEVICE_TRACKER)
225 ]
226 for alt_cam in alt_cams_with_tracker:
227 tracker_entity_id = alt_cam.get(CONF_DEVICE_TRACKER)
228 if tracker_entity_id and hass.states.is_state(tracker_entity_id, STATE_HOME):
229 avail_camera_entity_id = alt_cam[CONF_CAMERA]
230 _LOGGER.info(
231 "SUPERNOTIFY Selecting available camera %s rather than %s", avail_camera_entity_id, camera_entity_id
232 )
233 break
234 if avail_camera_entity_id is None:
235 alt_cam_ids_without_tracker = [
236 c
237 for c in preferred_cam.get(CONF_ALT_CAMERA, [])
238 if c not in cameras or not cameras[c].get(CONF_DEVICE_TRACKER)
239 ]
240 if len(alt_cam_ids_without_tracker) > 0:
241 _LOGGER.info(
242 "SUPERNOTIFY Selecting untracked camera %s rather than %s", avail_camera_entity_id, camera_entity_id
243 )
244 avail_camera_entity_id = alt_cam_ids_without_tracker[0]
246 if avail_camera_entity_id is None:
247 _LOGGER.warning("%s not available and no alternative available", camera_entity_id)
248 for c in cameras.values():
249 if c.get(CONF_DEVICE_TRACKER):
250 _LOGGER.debug(
251 "SUPERNOTIFY Tracker %s: %s", c.get(CONF_DEVICE_TRACKER), hass.states.get(c[CONF_DEVICE_TRACKER])
252 )
254 except Exception as e:
255 _LOGGER.warning("SUPERNOTIFY Unable to select available camera: %s", e)
257 return avail_camera_entity_id
260async def grab_image(notification: "Notification", delivery_name: str, context: Context) -> Path | None: # type: ignore # noqa: F821
261 snapshot_url = notification.media.get(ATTR_MEDIA_SNAPSHOT_URL)
262 camera_entity_id = notification.media.get(ATTR_MEDIA_CAMERA_ENTITY_ID)
263 delivery_config = notification.delivery_data(delivery_name)
264 jpeg_opts = notification.media.get(ATTR_JPEG_OPTS, delivery_config.get(CONF_OPTIONS, {}).get(OPTION_JPEG))
266 if not snapshot_url and not camera_entity_id:
267 return None
269 image_path: Path | None = None
270 if notification.snapshot_image_path is not None:
271 return notification.snapshot_image_path # type: ignore
272 if snapshot_url and context.media_path and context.hass_api:
273 image_path = await snapshot_from_url(
274 context.hass_api._hass,
275 snapshot_url,
276 notification.id,
277 context.media_path,
278 context.hass_api.internal_url,
279 jpeg_opts,
280 )
281 elif camera_entity_id and camera_entity_id.startswith("image.") and context.hass_api._hass and context.media_path:
282 image_path = await snap_image(context, camera_entity_id, context.media_path, notification.id, jpeg_opts)
283 elif camera_entity_id:
284 if not context.hass_api._hass or not context.media_path:
285 _LOGGER.warning("SUPERNOTIFY No homeassistant ref or media path for camera %s", camera_entity_id)
286 return None
287 active_camera_entity_id = select_avail_camera(context.hass_api._hass, context.cameras, camera_entity_id)
288 if active_camera_entity_id:
289 camera_config = context.cameras.get(active_camera_entity_id, {})
290 camera_delay = notification.media.get(ATTR_MEDIA_CAMERA_DELAY, camera_config.get(CONF_PTZ_DELAY))
291 camera_ptz_preset_default = camera_config.get(CONF_PTZ_PRESET_DEFAULT)
292 camera_ptz_method = camera_config.get(CONF_PTZ_METHOD)
293 camera_ptz_preset = notification.media.get(ATTR_MEDIA_CAMERA_PTZ_PRESET)
294 _LOGGER.debug(
295 "SUPERNOTIFY snapping camera %s, ptz %s->%s, delay %s secs",
296 active_camera_entity_id,
297 camera_ptz_preset,
298 camera_ptz_preset_default,
299 camera_delay,
300 )
301 if camera_ptz_preset:
302 await move_camera_to_ptz_preset(
303 context.hass_api._hass, active_camera_entity_id, camera_ptz_preset, method=camera_ptz_method
304 )
305 if camera_delay:
306 _LOGGER.debug("SUPERNOTIFY Waiting %s secs before snapping", camera_delay)
307 await asyncio.sleep(camera_delay)
308 image_path = await snap_camera(
309 context.hass_api._hass,
310 active_camera_entity_id,
311 media_path=context.media_path,
312 max_camera_wait=15,
313 jpeg_opts=jpeg_opts,
314 )
315 if camera_ptz_preset and camera_ptz_preset_default:
316 await move_camera_to_ptz_preset(
317 context.hass_api._hass, active_camera_entity_id, camera_ptz_preset_default, method=camera_ptz_method
318 )
320 if image_path is None:
321 _LOGGER.warning("SUPERNOTIFY No media available to attach (%s,%s)", snapshot_url, camera_entity_id)
322 return None
323 # TODO: replace poking inside notification
324 notification.snapshot_image_path = image_path
325 return image_path