Coverage for custom_components/supernotify/common.py: 34%

106 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2026-01-07 15:35 +0000

1"""Miscellaneous helper functions. 

2 

3No same pkg dependencies permitted 

4""" 

5 

6import datetime as dt 

7import logging 

8from abc import abstractmethod 

9from collections.abc import KeysView 

10from dataclasses import dataclass, field 

11from enum import Enum 

12from typing import Any 

13 

14from cachetools import TTLCache 

15from homeassistant.helpers.typing import ConfigType 

16 

17from . import ( 

18 ATTR_DUPE_POLICY_MTSLP, 

19 ATTR_DUPE_POLICY_NONE, 

20 CONF_DUPE_POLICY, 

21 CONF_SIZE, 

22 CONF_TTL, 

23 PRIORITY_VALUES, 

24) 

25 

26_LOGGER = logging.getLogger(__name__) 

27 

28 

29def safe_get(probably_a_dict: dict[Any, Any] | None, key: Any, default: Any = None) -> Any: 

30 probably_a_dict = probably_a_dict or {} 

31 return probably_a_dict.get(key, default) 

32 

33 

34def safe_extend(target: list[Any], extension: list[Any] | tuple[Any] | Any) -> list[Any]: 

35 if target is None: 

36 target = [] 

37 elif not isinstance(target, list): 

38 target = [target] 

39 if isinstance(extension, list | tuple): 

40 target.extend(extension) 

41 elif extension: 

42 target.append(extension) 

43 return target 

44 

45 

46def nullable_ensure_list(v: Any) -> list[Any] | None: 

47 if v is None: 

48 return None 

49 return ensure_list(v) 

50 

51 

52def ensure_list(v: Any) -> list[Any]: 

53 if v is None: 

54 return [] 

55 if isinstance(v, list): 

56 return v 

57 if isinstance(v, tuple): 

58 return list(v) 

59 return [v] 

60 

61 

62def ensure_dict(v: Any, default: Any = None) -> dict[Any, Any]: 

63 if v is None: 

64 return {} 

65 if isinstance(v, dict): 

66 return v 

67 if isinstance(v, set | list): 

68 return dict.fromkeys(v, default) 

69 return {v: default} 

70 

71 

72def sanitize(v: Any, minimal: bool = True, top_level_keys_only: bool = False, **kwargs) -> Any: 

73 if isinstance(v, dt.datetime | dt.time | dt.date): 

74 return v.isoformat() 

75 if isinstance(v, str | int | float | bool): 

76 return v 

77 if isinstance(v, list | KeysView): 

78 return [sanitize(vv, minimal=minimal, **kwargs) for vv in v] 

79 if isinstance(v, tuple): 

80 return (sanitize(vv, minimal=minimal, **kwargs) for vv in v) 

81 if isinstance(v, dict): 

82 if top_level_keys_only: 

83 return [sanitize(k, minimal, **kwargs) for k in v] 

84 return {k: sanitize(vv, minimal=minimal, **kwargs) for k, vv in v.items()} 

85 if isinstance(v, Enum): 

86 return str(v) 

87 if isinstance(v, object): 

88 if hasattr(v, "contents"): 

89 return sanitize(v.contents(**kwargs), minimal=minimal, **kwargs) 

90 if hasattr(v, "as_dict"): 

91 return v.as_dict(**kwargs) 

92 return None 

93 

94 

95@dataclass 

96class CallRecord: 

97 elapsed: float = field() 

98 domain: str | None = field(default=None) 

99 action: str | None = field(default=None) 

100 action_data: dict[str, Any] | None = field(default=None) 

101 target_data: dict[str, Any] | None = field(default=None) 

102 exception: str | None = field(default=None) 

103 debug: bool = field(default=False) 

104 service_response: dict[str, Any] | None = field(default=None) 

105 

106 def contents(self, **_kwargs: Any) -> dict[str, Any]: 

107 result = { 

108 "domain": self.domain, 

109 "action": self.action, 

110 "action_data": self.action_data, 

111 "elapsed": self.elapsed, 

112 "debug": self.debug, 

113 } 

114 if self.target_data is not None: 

115 result["target_data"] = self.target_data 

116 if self.exception is not None: 

117 result["exception"] = self.exception 

118 if self.service_response is not None: 

119 result["service_response"] = self.service_response 

120 return result 

121 

122 

123class DupeCheckable: 

124 id: str 

125 priority: str 

126 

127 @abstractmethod 

128 def hash(self) -> int: 

129 raise NotImplementedError 

130 

131 

132class DupeChecker: 

133 def __init__(self, dupe_check_config: ConfigType) -> None: 

134 self.policy = dupe_check_config.get(CONF_DUPE_POLICY, ATTR_DUPE_POLICY_MTSLP) 

135 # dupe check cache, key is (priority, message hash) 

136 self.cache: TTLCache[tuple[int, int], str] = TTLCache( 

137 maxsize=dupe_check_config.get(CONF_SIZE, 100), ttl=dupe_check_config.get(CONF_TTL, 120) 

138 ) 

139 

140 def check(self, dupe_candidate: DupeCheckable) -> bool: 

141 if self.policy == ATTR_DUPE_POLICY_NONE: 

142 return False 

143 hashed: int = dupe_candidate.hash() 

144 ranked_priority: int = PRIORITY_VALUES.get(dupe_candidate.priority, 3) 

145 dupe = False 

146 for prev_hash, prev_prior in self.cache: 

147 if prev_hash == hashed and prev_prior >= ranked_priority: 

148 _LOGGER.debug("SUPERNOTIFY Detected dupe: %s", dupe_candidate.id) 

149 dupe = True 

150 self.cache[hashed, ranked_priority] = dupe_candidate.id 

151 return dupe