This commit is contained in:
Feng Wang 2024-12-23 16:22:44 +08:00
parent 8778321b50
commit 8fcb4d895c

View File

@ -605,9 +605,8 @@ class MipsClient(ABC):
return return
self.log_info(f'mips connect, {flags}, {rc}, {props}') self.log_info(f'mips connect, {flags}, {rc}, {props}')
self._mqtt_state = True self._mqtt_state = True
if self._on_mips_connect: self._internal_loop.call_soon(
self._internal_loop.call_soon( self._on_mips_connect, rc, props)
self._on_mips_connect, rc, props)
for item in self._mips_state_sub_map.values(): for item in self._mips_state_sub_map.values():
if item.handler is None: if item.handler is None:
continue continue
@ -619,7 +618,7 @@ class MipsClient(ABC):
self._event_disconnect.clear() self._event_disconnect.clear()
def __on_connect_failed(self, client:Client, user_data:Any) -> None: def __on_connect_failed(self, client:Client, user_data:Any) -> None:
self.log_error(f'mips connect failed') self.log_error('mips connect failed')
# Try to reconnect # Try to reconnect
self.__mips_try_reconnect() self.__mips_try_reconnect()
@ -639,9 +638,8 @@ class MipsClient(ABC):
self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer.cancel()
self._mips_sub_pending_timer = None self._mips_sub_pending_timer = None
self._mips_sub_pending_map = {} self._mips_sub_pending_map = {}
if self._on_mips_disconnect: self._internal_loop.call_soon(
self._internal_loop.call_soon( self._on_mips_disconnect, rc, props)
self._on_mips_disconnect, rc, props)
# Call state sub handler # Call state sub handler
for item in self._mips_state_sub_map.values(): for item in self._mips_state_sub_map.values():
if item.handler is None: if item.handler is None:
@ -656,7 +654,12 @@ class MipsClient(ABC):
self._event_disconnect.set() self._event_disconnect.set()
self._event_connect.clear() self._event_connect.clear()
def __on_message(self, client:Client, user_data:Any , msg:MQTTMessage) -> None: def __on_message(
self,
client: Client,
user_data: Any,
msg: MQTTMessage
) -> None:
self._on_mips_message(topic=msg.topic, payload=msg.payload) self._on_mips_message(topic=msg.topic, payload=msg.payload)
def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: def __mips_sub_internal_pending_handler(self, ctx: Any) -> None:
@ -710,7 +713,8 @@ class MipsClient(ABC):
if result == MQTT_ERR_SUCCESS: if result == MQTT_ERR_SUCCESS:
socket = self._mqtt.socket() socket = self._mqtt.socket()
if socket is None: if socket is None:
self.log_error('__mips_connect, connect success, but socket is None') self.log_error(
'__mips_connect, connect success, but socket is None')
self.__mips_try_reconnect() self.__mips_try_reconnect()
return return
self._mqtt_fd = socket.fileno() self._mqtt_fd = socket.fileno()
@ -725,7 +729,7 @@ class MipsClient(ABC):
else: else:
self.log_error(f'__mips_connect error result, {result}') self.log_error(f'__mips_connect error result, {result}')
self.__mips_try_reconnect() self.__mips_try_reconnect()
def __mips_try_reconnect(self, immediately: bool = False) -> None: def __mips_try_reconnect(self, immediately: bool = False) -> None:
if self._mips_reconnect_timer: if self._mips_reconnect_timer:
self._mips_reconnect_timer.cancel() self._mips_reconnect_timer.cancel()
@ -739,7 +743,7 @@ class MipsClient(ABC):
'mips try reconnect after %ss', interval) 'mips try reconnect after %ss', interval)
self._mips_reconnect_timer = self._internal_loop.call_later( self._mips_reconnect_timer = self._internal_loop.call_later(
interval, self.__mips_connect) interval, self.__mips_connect)
def __mips_start_connect_tries(self) -> None: def __mips_start_connect_tries(self) -> None:
self._mips_reconnect_tag = True self._mips_reconnect_tag = True
self.__mips_try_reconnect(immediately=True) self.__mips_try_reconnect(immediately=True)
@ -769,7 +773,7 @@ class MipsClient(ABC):
state = MipsState(key=key, handler=handler) state = MipsState(key=key, handler=handler)
self._mips_state_sub_map[key] = state self._mips_state_sub_map[key] = state
self.log_debug(f'mips register mips state, {key}') self.log_debug(f'mips register mips state, {key}')
def __unsub_mips_state(self, key: str) -> None: def __unsub_mips_state(self, key: str) -> None:
del self._mips_state_sub_map[key] del self._mips_state_sub_map[key]
self.log_debug(f'mips unregister mips state, {key}') self.log_debug(f'mips unregister mips state, {key}')
@ -1009,7 +1013,7 @@ class MipsCloudClient(MipsClient):
self._mips_sub_internal(topic=topic) self._mips_sub_internal(topic=topic)
else: else:
self.log_debug(f'mips cloud re-reg broadcast, {topic}') self.log_debug(f'mips cloud re-reg broadcast, {topic}')
def __unreg_broadcast(self, topic: str) -> None: def __unreg_broadcast(self, topic: str) -> None:
if self._msg_matcher.get(topic=topic): if self._msg_matcher.get(topic=topic):
del self._msg_matcher[topic] del self._msg_matcher[topic]
@ -1376,7 +1380,9 @@ class MipsLocalClient(MipsClient):
@final @final
@property @property
def on_dev_list_changed(self) -> Callable[[Any, list[str]], Coroutine] | None: def on_dev_list_changed(
self
) -> Callable[[Any, list[str]], Coroutine] | None:
return self._on_dev_list_changed return self._on_dev_list_changed
@final @final
@ -1430,7 +1436,7 @@ class MipsLocalClient(MipsClient):
self._mips_sub_internal(topic=f'master/{topic}') self._mips_sub_internal(topic=f'master/{topic}')
else: else:
self.log_debug(f'mips re-reg broadcast, {sub_topic}') self.log_debug(f'mips re-reg broadcast, {sub_topic}')
def __unreg_broadcast(self, topic) -> None: def __unreg_broadcast(self, topic) -> None:
# Central hub gateway needs to add prefix # Central hub gateway needs to add prefix
unsub_topic: str = f'{self._did}/{topic}' unsub_topic: str = f'{self._did}/{topic}'
@ -1451,7 +1457,11 @@ class MipsLocalClient(MipsClient):
# Sub broadcast topic # Sub broadcast topic
for topic, _ in list(self._msg_matcher.iter_all_nodes()): for topic, _ in list(self._msg_matcher.iter_all_nodes()):
self._mips_sub_internal( self._mips_sub_internal(
topic=re.sub(f'^{self._did}', 'master', topic)) topic=re.sub(f'^{self._did}', 'master', topic))\
@final
def _on_mips_disconnect(self, rc: int, props: dict) -> None:
pass
@final @final
def _on_mips_message(self, topic: str, payload: bytes) -> None: def _on_mips_message(self, topic: str, payload: bytes) -> None:
@ -1461,7 +1471,8 @@ class MipsLocalClient(MipsClient):
# Reply # Reply
if topic == self._reply_topic: if topic == self._reply_topic:
self.log_debug(f'on request reply, {mips_msg}') self.log_debug(f'on request reply, {mips_msg}')
req: MipsRequest | None = self._request_map.pop(str(mips_msg.mid), None) req: MipsRequest | None = self._request_map.pop(
str(mips_msg.mid), None)
if req: if req:
# Cancel timer # Cancel timer
if req.timer: if req.timer: