fix access violation

This commit is contained in:
Feng Wang 2024-12-31 19:36:15 +08:00
parent 48a931b9f6
commit 0668985969

View File

@ -243,6 +243,7 @@ class _MipsClient(ABC):
_mips_reconnect_interval: float _mips_reconnect_interval: float
_mips_reconnect_timer: Optional[asyncio.TimerHandle] _mips_reconnect_timer: Optional[asyncio.TimerHandle]
_mips_state_sub_map: dict[str, _MipsState] _mips_state_sub_map: dict[str, _MipsState]
_mips_state_sub_map_lock: threading.Lock
_mips_sub_pending_map: dict[str, int] _mips_sub_pending_map: dict[str, int]
_mips_sub_pending_timer: Optional[asyncio.TimerHandle] _mips_sub_pending_timer: Optional[asyncio.TimerHandle]
@ -286,6 +287,7 @@ class _MipsClient(ABC):
self._mips_reconnect_interval = 0 self._mips_reconnect_interval = 0
self._mips_reconnect_timer = None self._mips_reconnect_timer = None
self._mips_state_sub_map = {} self._mips_state_sub_map = {}
self._mips_state_sub_map_lock = threading.Lock()
self._mips_sub_pending_map = {} self._mips_sub_pending_map = {}
self._mips_sub_pending_timer = None self._mips_sub_pending_timer = None
# DO NOT start the thread yet. Do that on connect # DO NOT start the thread yet. Do that on connect
@ -339,7 +341,8 @@ class _MipsClient(ABC):
self._cert_file = None self._cert_file = None
self._key_file = None self._key_file = None
self._mqtt_logger = None self._mqtt_logger = None
self._mips_state_sub_map.clear() with self._mips_state_sub_map_lock:
self._mips_state_sub_map.clear()
self._mips_sub_pending_map.clear() self._mips_sub_pending_map.clear()
self._mips_sub_pending_timer = None self._mips_sub_pending_timer = None
@ -391,11 +394,15 @@ class _MipsClient(ABC):
) -> bool: ) -> bool:
"""Subscribe mips state. """Subscribe mips state.
NOTICE: callback to main loop thread NOTICE: callback to main loop thread
This will be called before the client is connected.
So use mutex instead of IPC.
""" """
if isinstance(key, str) is False or handler is None: if isinstance(key, str) is False or handler is None:
raise MIoTMipsError('invalid params') raise MIoTMipsError('invalid params')
self._internal_loop.call_soon_threadsafe( state = _MipsState(key=key, handler=handler)
self.__sub_mips_state, key, handler) with self._mips_state_sub_map_lock:
self._mips_state_sub_map[key] = state
self.log_debug(f'mips register mips state, {key}')
return True return True
@final @final
@ -403,7 +410,9 @@ class _MipsClient(ABC):
"""Unsubscribe mips state.""" """Unsubscribe mips state."""
if isinstance(key, str) is False: if isinstance(key, str) is False:
raise MIoTMipsError('invalid params') raise MIoTMipsError('invalid params')
self._internal_loop.call_soon_threadsafe(self.__unsub_mips_state, key) with self._mips_state_sub_map_lock:
del self._mips_state_sub_map[key]
self.log_debug(f'mips unregister mips state, {key}')
return True return True
@abstractmethod @abstractmethod
@ -606,12 +615,13 @@ class _MipsClient(ABC):
self._mqtt_state = True self._mqtt_state = True
self._internal_loop.call_soon( self._internal_loop.call_soon(
self._on_mips_connect, rc, props) self._on_mips_connect, rc, props)
for item in self._mips_state_sub_map.values(): with self._mips_state_sub_map_lock:
if item.handler is None: for item in self._mips_state_sub_map.values():
continue if item.handler is None:
self.main_loop.call_soon_threadsafe( continue
self.main_loop.create_task, self.main_loop.call_soon_threadsafe(
item.handler(item.key, True)) self.main_loop.create_task,
item.handler(item.key, True))
# Resolve future # Resolve future
self._event_connect.set() self._event_connect.set()
self._event_disconnect.clear() self._event_disconnect.clear()
@ -640,12 +650,13 @@ class _MipsClient(ABC):
self._internal_loop.call_soon( self._internal_loop.call_soon(
self._on_mips_disconnect, rc, props) self._on_mips_disconnect, rc, props)
# Call state sub handler # Call state sub handler
for item in self._mips_state_sub_map.values(): with self._mips_state_sub_map_lock:
if item.handler is None: for item in self._mips_state_sub_map.values():
continue if item.handler is None:
self.main_loop.call_soon_threadsafe( continue
self.main_loop.create_task, self.main_loop.call_soon_threadsafe(
item.handler(item.key, False)) self.main_loop.create_task,
item.handler(item.key, False))
# Try to reconnect # Try to reconnect
self.__mips_try_reconnect() self.__mips_try_reconnect()
@ -766,17 +777,6 @@ class _MipsClient(ABC):
self.__mips_disconnect() self.__mips_disconnect()
self._internal_loop.stop() self._internal_loop.stop()
def __sub_mips_state(
self, key: str, handler: Callable[[str, bool], Coroutine]
) -> None:
state = _MipsState(key=key, handler=handler)
self._mips_state_sub_map[key] = state
self.log_debug(f'mips register mips state, {key}')
def __unsub_mips_state(self, key: str) -> None:
del self._mips_state_sub_map[key]
self.log_debug(f'mips unregister mips state, {key}')
def __get_next_reconnect_time(self) -> float: def __get_next_reconnect_time(self) -> float:
if self._mips_reconnect_interval == 0: if self._mips_reconnect_interval == 0:
self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN