Coverage for custom_components/supernotify/media_grab.py: 83%

186 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-21 23:31 +0000

1import asyncio 

2import io 

3import logging 

4import time 

5from http import HTTPStatus 

6from io import BytesIO 

7from pathlib import Path 

8from typing import TYPE_CHECKING, Any 

9 

10import aiofiles 

11import anyio 

12from aiohttp import ClientTimeout 

13from homeassistant.const import STATE_HOME 

14from homeassistant.core import HomeAssistant 

15from homeassistant.helpers.aiohttp_client import async_get_clientsession 

16from PIL import Image 

17 

18from custom_components.supernotify import ( 

19 ATTR_JPEG_OPTS, 

20 ATTR_MEDIA_CAMERA_DELAY, 

21 ATTR_MEDIA_CAMERA_ENTITY_ID, 

22 ATTR_MEDIA_CAMERA_PTZ_PRESET, 

23 ATTR_MEDIA_SNAPSHOT_URL, 

24 CONF_ALT_CAMERA, 

25 CONF_CAMERA, 

26 CONF_DEVICE_TRACKER, 

27 CONF_OPTIONS, 

28 CONF_PTZ_DELAY, 

29 CONF_PTZ_METHOD, 

30 CONF_PTZ_PRESET_DEFAULT, 

31 OPTION_JPEG, 

32 PTZ_METHOD_FRIGATE, 

33 PTZ_METHOD_ONVIF, 

34) 

35 

36from .context import Context 

37 

38if TYPE_CHECKING: 

39 from homeassistant.components.image import ImageEntity 

40 

41_LOGGER = logging.getLogger(__name__) 

42 

43 

44async def snapshot_from_url( 

45 hass: HomeAssistant | None, 

46 snapshot_url: str, 

47 notification_id: str, 

48 media_path: Path, 

49 hass_base_url: str | None, 

50 remote_timeout: int = 15, 

51 jpeg_opts: dict[str, Any] | None = None, 

52) -> Path | None: 

53 hass_base_url = hass_base_url or "" 

54 if not hass: 

55 raise ValueError("HomeAssistant not available") 

56 try: 

57 media_dir: anyio.Path = anyio.Path(media_path) / "snapshot" 

58 await media_dir.mkdir(parents=True, exist_ok=True) 

59 

60 if snapshot_url.startswith("http"): 

61 image_url = snapshot_url 

62 else: 

63 image_url = f"{hass_base_url}{snapshot_url}" 

64 websession = async_get_clientsession(hass) 

65 r = await websession.get(image_url, timeout=ClientTimeout(total=remote_timeout)) 

66 if r.status != HTTPStatus.OK: 

67 _LOGGER.warning("SUPERNOTIFY Unable to retrieve %s: %s", image_url, r.status) 

68 else: 

69 if r.content_type in ("image/jpeg", "image/jpg"): 

70 media_ext = "jpg" 

71 image_format = "JPEG" 

72 elif r.content_type == "image/png": 

73 media_ext = "png" 

74 image_format = "PNG" 

75 elif r.content_type == "image/gif": 

76 media_ext = "gif" 

77 image_format = "GIF" 

78 else: 

79 _LOGGER.info("SUPERNOTIFY Unexpected MIME type %s from snap of %s", r.content_type, image_url) 

80 media_ext = "img" 

81 image_format = None 

82 

83 # TODO: configure image rewrite 

84 image_path: Path = Path(media_dir) / f"{notification_id}.{media_ext}" 

85 image: Image.Image = Image.open(io.BytesIO(await r.content.read())) 

86 # rewrite to remove metadata, incl custom CCTV comments that confusie python MIMEImage 

87 clean_image: Image.Image = Image.new(image.mode, image.size) 

88 clean_image.putdata(image.getdata()) 

89 buffer = BytesIO() 

90 img_args = {} 

91 if image_format == "JPEG" and jpeg_opts: 

92 img_args.update(jpeg_opts) 

93 clean_image.save(buffer, image_format, **img_args) 

94 async with aiofiles.open(image_path, "wb") as file: 

95 await file.write(buffer.getbuffer()) 

96 _LOGGER.debug("SUPERNOTIFY Fetched image from %s to %s", image_url, image_path) 

97 return image_path 

98 except Exception as e: 

99 _LOGGER.error("SUPERNOTIFY Image snap fail: %s", e) 

100 return None 

101 

102 

103async def move_camera_to_ptz_preset( 

104 hass: HomeAssistant, camera_entity_id: str, preset: str | int, method: str = PTZ_METHOD_ONVIF 

105) -> None: 

106 try: 

107 _LOGGER.info("SUPERNOTIFY Executing PTZ by %s to %s for %s", method, preset, camera_entity_id) 

108 if method == PTZ_METHOD_FRIGATE: 

109 await hass.services.async_call( 

110 "frigate", 

111 "ptz", 

112 service_data={"action": "preset", "argument": preset}, 

113 target={ 

114 "entity_id": camera_entity_id, 

115 }, 

116 ) 

117 elif method == PTZ_METHOD_ONVIF: 

118 await hass.services.async_call( 

119 "onvif", 

120 "ptz", 

121 service_data={"move_mode": "GotoPreset", "preset": preset}, 

122 target={ 

123 "entity_id": camera_entity_id, 

124 }, 

125 ) 

126 else: 

127 _LOGGER.warning("SUPERNOTIFY Unknown PTZ method %s", method) 

128 except Exception as e: 

129 _LOGGER.warning("SUPERNOTIFY Unable to move %s to ptz preset %s: %s", camera_entity_id, preset, e) 

130 

131 

132async def snap_image( 

133 context: Context, 

134 entity_id: str, 

135 media_path: Path, 

136 notification_id: str, 

137 jpeg_opts: dict[str, Any] | None = None, 

138) -> Path | None: 

139 """Use for any image, including MQTT Image""" 

140 image_path: anyio.Path | None = None 

141 try: 

142 image_entity: ImageEntity | None = None 

143 if context.hass_api._hass: 

144 # TODO: must be a better hass method than this 

145 image_entity = context.hass_api._hass.data["image"].get_entity(entity_id) 

146 if image_entity: 

147 bitmap: bytes | None = await image_entity.async_image() 

148 if bitmap is None: 

149 _LOGGER.warning("SUPERNOTIFY Empty bitmap from image entity %s", entity_id) 

150 else: 

151 image: Image.Image = Image.open(io.BytesIO(bitmap)) 

152 media_dir: anyio.Path = anyio.Path(media_path) / "image" 

153 await media_dir.mkdir(parents=True, exist_ok=True) 

154 

155 media_ext: str = image.format.lower() if image.format else "img" 

156 timed: str = str(time.time()).replace(".", "_") 

157 image_path = anyio.Path(media_dir) / f"{notification_id}_{timed}.{media_ext}" 

158 buffer = BytesIO() 

159 img_args = {} 

160 if media_ext in ("jpg", "jpeg") and jpeg_opts: 

161 img_args.update(jpeg_opts) 

162 image.save(buffer, image.format, **img_args) 

163 async with aiofiles.open(await image_path.resolve(), "wb") as file: 

164 await file.write(buffer.getbuffer()) 

165 else: 

166 _LOGGER.warning("SUPERNOTIFY Unable to find image entity %s", entity_id) 

167 except Exception as e: 

168 _LOGGER.warning("SUPERNOTIFY Unable to snap image %s: %s", entity_id, e) 

169 return None 

170 return Path(await image_path.resolve()) if image_path else None 

171 

172 

173async def snap_camera( 

174 hass: HomeAssistant, 

175 camera_entity_id: str, 

176 media_path: Path, 

177 max_camera_wait: int = 20, 

178 jpeg_opts: dict[str, Any] | None = None, 

179) -> Path | None: 

180 image_path: Path | None = None 

181 if not camera_entity_id: 

182 _LOGGER.warning("SUPERNOTIFY Empty camera entity id for snap") 

183 return image_path 

184 if jpeg_opts: 

185 _LOGGER.warning("jpeg_opts not yet supported by snap_camera") 

186 

187 try: 

188 media_dir: anyio.Path = anyio.Path(media_path) / "camera" 

189 await media_dir.mkdir(parents=True, exist_ok=True) 

190 timed = str(time.time()).replace(".", "_") 

191 image_path = Path(media_dir) / f"{camera_entity_id}_{timed}.jpg" 

192 await hass.services.async_call( 

193 "camera", "snapshot", service_data={"entity_id": camera_entity_id, "filename": image_path} 

194 ) 

195 

196 # give async service time 

197 cutoff_time = time.time() + max_camera_wait 

198 while time.time() < cutoff_time and not image_path.exists(): 

199 _LOGGER.info("Image file not available yet at %s, pausing", image_path) 

200 await asyncio.sleep(1) 

201 

202 except Exception as e: 

203 _LOGGER.warning("Failed to snap avail camera %s to %s: %s", camera_entity_id, image_path, e) 

204 image_path = None 

205 

206 return image_path 

207 

208 

209def select_avail_camera(hass: HomeAssistant, cameras: dict[str, Any], camera_entity_id: str) -> str | None: 

210 avail_camera_entity_id: str | None = None 

211 

212 try: 

213 preferred_cam = cameras.get(camera_entity_id) 

214 

215 if not preferred_cam or not preferred_cam.get(CONF_DEVICE_TRACKER): 

216 # assume unconfigured camera, or configured without tracker, available 

217 avail_camera_entity_id = camera_entity_id 

218 elif hass.states.is_state(preferred_cam[CONF_DEVICE_TRACKER], STATE_HOME): 

219 avail_camera_entity_id = camera_entity_id 

220 else: 

221 alt_cams_with_tracker = [ 

222 cameras[c] 

223 for c in preferred_cam.get(CONF_ALT_CAMERA, []) 

224 if c in cameras and cameras[c].get(CONF_DEVICE_TRACKER) 

225 ] 

226 for alt_cam in alt_cams_with_tracker: 

227 tracker_entity_id = alt_cam.get(CONF_DEVICE_TRACKER) 

228 if tracker_entity_id and hass.states.is_state(tracker_entity_id, STATE_HOME): 

229 avail_camera_entity_id = alt_cam[CONF_CAMERA] 

230 _LOGGER.info( 

231 "SUPERNOTIFY Selecting available camera %s rather than %s", avail_camera_entity_id, camera_entity_id 

232 ) 

233 break 

234 if avail_camera_entity_id is None: 

235 alt_cam_ids_without_tracker = [ 

236 c 

237 for c in preferred_cam.get(CONF_ALT_CAMERA, []) 

238 if c not in cameras or not cameras[c].get(CONF_DEVICE_TRACKER) 

239 ] 

240 if len(alt_cam_ids_without_tracker) > 0: 

241 _LOGGER.info( 

242 "SUPERNOTIFY Selecting untracked camera %s rather than %s", avail_camera_entity_id, camera_entity_id 

243 ) 

244 avail_camera_entity_id = alt_cam_ids_without_tracker[0] 

245 

246 if avail_camera_entity_id is None: 

247 _LOGGER.warning("%s not available and no alternative available", camera_entity_id) 

248 for c in cameras.values(): 

249 if c.get(CONF_DEVICE_TRACKER): 

250 _LOGGER.debug( 

251 "SUPERNOTIFY Tracker %s: %s", c.get(CONF_DEVICE_TRACKER), hass.states.get(c[CONF_DEVICE_TRACKER]) 

252 ) 

253 

254 except Exception as e: 

255 _LOGGER.warning("SUPERNOTIFY Unable to select available camera: %s", e) 

256 

257 return avail_camera_entity_id 

258 

259 

260async def grab_image(notification: "Notification", delivery_name: str, context: Context) -> Path | None: # type: ignore # noqa: F821 

261 snapshot_url = notification.media.get(ATTR_MEDIA_SNAPSHOT_URL) 

262 camera_entity_id = notification.media.get(ATTR_MEDIA_CAMERA_ENTITY_ID) 

263 delivery_config = notification.delivery_data(delivery_name) 

264 jpeg_opts = notification.media.get(ATTR_JPEG_OPTS, delivery_config.get(CONF_OPTIONS, {}).get(OPTION_JPEG)) 

265 

266 if not snapshot_url and not camera_entity_id: 

267 return None 

268 

269 image_path: Path | None = None 

270 if notification.snapshot_image_path is not None: 

271 return notification.snapshot_image_path # type: ignore 

272 if snapshot_url and context.media_path and context.hass_api: 

273 image_path = await snapshot_from_url( 

274 context.hass_api._hass, 

275 snapshot_url, 

276 notification.id, 

277 context.media_path, 

278 context.hass_api.internal_url, 

279 jpeg_opts, 

280 ) 

281 elif camera_entity_id and camera_entity_id.startswith("image.") and context.hass_api._hass and context.media_path: 

282 image_path = await snap_image(context, camera_entity_id, context.media_path, notification.id, jpeg_opts) 

283 elif camera_entity_id: 

284 if not context.hass_api._hass or not context.media_path: 

285 _LOGGER.warning("SUPERNOTIFY No homeassistant ref or media path for camera %s", camera_entity_id) 

286 return None 

287 active_camera_entity_id = select_avail_camera(context.hass_api._hass, context.cameras, camera_entity_id) 

288 if active_camera_entity_id: 

289 camera_config = context.cameras.get(active_camera_entity_id, {}) 

290 camera_delay = notification.media.get(ATTR_MEDIA_CAMERA_DELAY, camera_config.get(CONF_PTZ_DELAY)) 

291 camera_ptz_preset_default = camera_config.get(CONF_PTZ_PRESET_DEFAULT) 

292 camera_ptz_method = camera_config.get(CONF_PTZ_METHOD) 

293 camera_ptz_preset = notification.media.get(ATTR_MEDIA_CAMERA_PTZ_PRESET) 

294 _LOGGER.debug( 

295 "SUPERNOTIFY snapping camera %s, ptz %s->%s, delay %s secs", 

296 active_camera_entity_id, 

297 camera_ptz_preset, 

298 camera_ptz_preset_default, 

299 camera_delay, 

300 ) 

301 if camera_ptz_preset: 

302 await move_camera_to_ptz_preset( 

303 context.hass_api._hass, active_camera_entity_id, camera_ptz_preset, method=camera_ptz_method 

304 ) 

305 if camera_delay: 

306 _LOGGER.debug("SUPERNOTIFY Waiting %s secs before snapping", camera_delay) 

307 await asyncio.sleep(camera_delay) 

308 image_path = await snap_camera( 

309 context.hass_api._hass, 

310 active_camera_entity_id, 

311 media_path=context.media_path, 

312 max_camera_wait=15, 

313 jpeg_opts=jpeg_opts, 

314 ) 

315 if camera_ptz_preset and camera_ptz_preset_default: 

316 await move_camera_to_ptz_preset( 

317 context.hass_api._hass, active_camera_entity_id, camera_ptz_preset_default, method=camera_ptz_method 

318 ) 

319 

320 if image_path is None: 

321 _LOGGER.warning("SUPERNOTIFY No media available to attach (%s,%s)", snapshot_url, camera_entity_id) 

322 return None 

323 # TODO: replace poking inside notification 

324 notification.snapshot_image_path = image_path 

325 return image_path