From 0668985969324e96a8b7579fe0dd021f3ca6d899 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Tue, 31 Dec 2024 19:36:15 +0800 Subject: [PATCH] fix access violation --- .../xiaomi_home/miot/miot_mips.py | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index d2bebde..898c82d 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -243,6 +243,7 @@ class _MipsClient(ABC): _mips_reconnect_interval: float _mips_reconnect_timer: Optional[asyncio.TimerHandle] _mips_state_sub_map: dict[str, _MipsState] + _mips_state_sub_map_lock: threading.Lock _mips_sub_pending_map: dict[str, int] _mips_sub_pending_timer: Optional[asyncio.TimerHandle] @@ -286,6 +287,7 @@ class _MipsClient(ABC): self._mips_reconnect_interval = 0 self._mips_reconnect_timer = None self._mips_state_sub_map = {} + self._mips_state_sub_map_lock = threading.Lock() self._mips_sub_pending_map = {} self._mips_sub_pending_timer = None # DO NOT start the thread yet. Do that on connect @@ -339,7 +341,8 @@ class _MipsClient(ABC): self._cert_file = None self._key_file = None self._mqtt_logger = None - self._mips_state_sub_map.clear() + with self._mips_state_sub_map_lock: + self._mips_state_sub_map.clear() self._mips_sub_pending_map.clear() self._mips_sub_pending_timer = None @@ -391,11 +394,15 @@ class _MipsClient(ABC): ) -> bool: """Subscribe mips state. NOTICE: callback to main loop thread + This will be called before the client is connected. + So use mutex instead of IPC. """ if isinstance(key, str) is False or handler is None: raise MIoTMipsError('invalid params') - self._internal_loop.call_soon_threadsafe( - self.__sub_mips_state, key, handler) + state = _MipsState(key=key, handler=handler) + with self._mips_state_sub_map_lock: + self._mips_state_sub_map[key] = state + self.log_debug(f'mips register mips state, {key}') return True @final @@ -403,7 +410,9 @@ class _MipsClient(ABC): """Unsubscribe mips state.""" if isinstance(key, str) is False: raise MIoTMipsError('invalid params') - self._internal_loop.call_soon_threadsafe(self.__unsub_mips_state, key) + with self._mips_state_sub_map_lock: + del self._mips_state_sub_map[key] + self.log_debug(f'mips unregister mips state, {key}') return True @abstractmethod @@ -606,12 +615,13 @@ class _MipsClient(ABC): self._mqtt_state = True self._internal_loop.call_soon( self._on_mips_connect, rc, props) - for item in self._mips_state_sub_map.values(): - if item.handler is None: - continue - self.main_loop.call_soon_threadsafe( - self.main_loop.create_task, - item.handler(item.key, True)) + with self._mips_state_sub_map_lock: + for item in self._mips_state_sub_map.values(): + if item.handler is None: + continue + self.main_loop.call_soon_threadsafe( + self.main_loop.create_task, + item.handler(item.key, True)) # Resolve future self._event_connect.set() self._event_disconnect.clear() @@ -640,12 +650,13 @@ class _MipsClient(ABC): self._internal_loop.call_soon( self._on_mips_disconnect, rc, props) # Call state sub handler - for item in self._mips_state_sub_map.values(): - if item.handler is None: - continue - self.main_loop.call_soon_threadsafe( - self.main_loop.create_task, - item.handler(item.key, False)) + with self._mips_state_sub_map_lock: + for item in self._mips_state_sub_map.values(): + if item.handler is None: + continue + self.main_loop.call_soon_threadsafe( + self.main_loop.create_task, + item.handler(item.key, False)) # Try to reconnect self.__mips_try_reconnect() @@ -766,17 +777,6 @@ class _MipsClient(ABC): self.__mips_disconnect() self._internal_loop.stop() - def __sub_mips_state( - self, key: str, handler: Callable[[str, bool], Coroutine] - ) -> None: - state = _MipsState(key=key, handler=handler) - self._mips_state_sub_map[key] = state - self.log_debug(f'mips register mips state, {key}') - - def __unsub_mips_state(self, key: str) -> None: - del self._mips_state_sub_map[key] - self.log_debug(f'mips unregister mips state, {key}') - def __get_next_reconnect_time(self) -> float: if self._mips_reconnect_interval == 0: self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN