Coverage for custom_components/supernotify/common.py: 96%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-04-01 15:06 +0000

1"""Miscellaneous helper functions. 

2 

3No same pkg dependencies permitted 

4""" 

5 

6import datetime as dt 

7import logging 

8from abc import abstractmethod 

9from collections.abc import KeysView 

10from dataclasses import dataclass, field 

11from enum import Enum 

12from typing import TYPE_CHECKING, Any 

13 

14from cachetools import TTLCache 

15 

16from .const import ( 

17 ATTR_DUPE_POLICY_MT, 

18 ATTR_DUPE_POLICY_MTSLP, 

19 ATTR_DUPE_POLICY_NONE, 

20 CONF_DUPE_POLICY, 

21 CONF_SIZE, 

22 CONF_TTL, 

23 PRIORITY_VALUES, 

24) 

25 

26if TYPE_CHECKING: 

27 from homeassistant.helpers.typing import ConfigType 

28 

29_LOGGER = logging.getLogger(__name__) 

30 

31_FALSY_STRINGS = frozenset({"false", "0", "no", "off", ""}) 

32 

33 

34def safe_get(probably_a_dict: dict[Any, Any] | None, key: Any, default: Any = None) -> Any: 

35 probably_a_dict = probably_a_dict or {} 

36 return probably_a_dict.get(key, default) 

37 

38 

39def safe_extend(target: list[Any], extension: list[Any] | tuple[Any] | Any) -> list[Any]: 

40 if target is None: 

41 target = [] 

42 elif not isinstance(target, list): 

43 target = [target] 

44 if isinstance(extension, list | tuple): 

45 target.extend(extension) 

46 elif extension: 

47 target.append(extension) 

48 return target 

49 

50 

51def nullable_ensure_list(v: Any) -> list[Any] | None: 

52 if v is None: 

53 return None 

54 return ensure_list(v) 

55 

56 

57def ensure_list(v: Any) -> list[Any]: 

58 if v is None: 

59 return [] 

60 if isinstance(v, list): 

61 return v 

62 if isinstance(v, tuple): 

63 return list(v) 

64 return [v] 

65 

66 

67def ensure_dict(v: Any, default: Any = None) -> dict[Any, Any]: 

68 if v is None: 

69 return {} 

70 if isinstance(v, dict): 

71 return v 

72 if isinstance(v, set | list): 

73 return dict.fromkeys(v, default) 

74 return {v: default} 

75 

76 

77def sanitize(v: Any, minimal: bool = True, top_level_keys_only: bool = False, **kwargs) -> Any: 

78 if isinstance(v, dt.datetime | dt.time | dt.date): 

79 return v.isoformat() 

80 if isinstance(v, str | int | float | bool): 

81 return v 

82 if isinstance(v, list | KeysView): 

83 return [sanitize(vv, minimal=minimal, **kwargs) for vv in v] 

84 if isinstance(v, tuple): 

85 return (sanitize(vv, minimal=minimal, **kwargs) for vv in v) 

86 if isinstance(v, dict): 

87 if top_level_keys_only: 

88 return [sanitize(k, minimal, **kwargs) for k in v] 

89 return {k: sanitize(vv, minimal=minimal, **kwargs) for k, vv in v.items()} 

90 if isinstance(v, Enum): 

91 return str(v) 

92 if isinstance(v, object): 

93 if hasattr(v, "contents"): 

94 return sanitize(v.contents(**kwargs), minimal=minimal, **kwargs) 

95 if hasattr(v, "as_dict"): 

96 return v.as_dict(**kwargs) 

97 return None 

98 

99 

100@dataclass 

101class CallRecord: 

102 elapsed: float = field() 

103 domain: str | None = field(default=None) 

104 action: str | None = field(default=None) 

105 action_data: dict[str, Any] | None = field(default=None) 

106 target_data: dict[str, Any] | None = field(default=None) 

107 exception: str | None = field(default=None) 

108 debug: bool = field(default=False) 

109 service_response: dict[str, Any] | None = field(default=None) 

110 

111 def contents(self, **_kwargs: Any) -> dict[str, Any]: 

112 result = { 

113 "domain": self.domain, 

114 "action": self.action, 

115 "action_data": self.action_data, 

116 "elapsed": self.elapsed, 

117 "debug": self.debug, 

118 } 

119 if self.target_data is not None: 

120 result["target_data"] = self.target_data 

121 if self.exception is not None: 

122 result["exception"] = self.exception 

123 if self.service_response is not None: 

124 result["service_response"] = self.service_response 

125 return result 

126 

127 

128class DupeCheckable: 

129 id: str 

130 priority: str 

131 

132 @abstractmethod 

133 def hash(self) -> int: 

134 raise NotImplementedError 

135 

136 

137class DupeChecker: 

138 """Apply the duplicate checking policy to duplicate candidates""" 

139 

140 def __init__(self, dupe_check_config: ConfigType) -> None: 

141 self.policy = dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP) 

142 # dupe check cache, key is (priority, message hash) 

143 self.cache: TTLCache[tuple[int, int], str] = TTLCache( 

144 maxsize=dupe_check_config.get(CONF_SIZE, 100), ttl=dupe_check_config.get(CONF_TTL, 120) 

145 ) 

146 

147 def check(self, dupe_candidate: DupeCheckable) -> bool: 

148 if self.policy == ATTR_DUPE_POLICY_NONE: 

149 return False 

150 hashed: int = dupe_candidate.hash() 

151 ranked_priority: int = PRIORITY_VALUES.get(dupe_candidate.priority, 3) 

152 if self.policy == ATTR_DUPE_POLICY_MTSLP: 

153 dupe: bool = any(prev_hash == hashed and prev_prior >= ranked_priority for prev_hash, prev_prior in self.cache) 

154 elif self.policy == ATTR_DUPE_POLICY_MT: 

155 dupe = any(prev_hash == hashed for prev_hash, _prev_prior in self.cache) 

156 else: 

157 dupe = False 

158 if dupe: 

159 _LOGGER.debug("SUPERNOTIFY Detected dupe: %s", dupe_candidate.id) 

160 self.cache[hashed, ranked_priority] = dupe_candidate.id 

161 return dupe 

162 

163 

164def boolify(value: Any, default: bool) -> bool: 

165 """Convert a value to bool, correctly handling string 'false'/'true'. 

166 

167 Python's built-in bool() treats any non-empty string as True, so 

168 bool("false") == True. This helper avoids that pitfall for values 

169 that may arrive as YAML strings or Jinja2 template results. 

170 """ 

171 if value is None: 

172 return default 

173 if isinstance(value, bool): 

174 return value 

175 if isinstance(value, str): 

176 return value.strip().lower() not in _FALSY_STRINGS 

177 return bool(value)