From 8778321b506ef954231c6062098b3e23e38cf681 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Sun, 22 Dec 2024 23:49:25 +0800 Subject: [PATCH 1/6] remove use of tev & fix type errors --- .../xiaomi_home/miot/miot_mips.py | 987 +++++++----------- 1 file changed, 390 insertions(+), 597 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index 6c6b358..6c5eccd 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -48,8 +48,6 @@ MIoT Pub/Sub client. import asyncio import json import logging -import os -import queue import random import re import ssl @@ -58,19 +56,19 @@ import threading from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum, auto -from typing import Any, Callable, Optional, final +from typing import Any, Callable, Optional, final, Coroutine from paho.mqtt.client import ( MQTT_ERR_SUCCESS, MQTT_ERR_UNKNOWN, Client, - MQTTv5) + MQTTv5, + MQTTMessage) # pylint: disable=relative-beyond-top-level from .common import MIoTMatcher from .const import MIHOME_MQTT_KEEPALIVE from .miot_error import MIoTErrorCode, MIoTMipsError -from .miot_ev import MIoTEventLoop, TimeoutHandle _LOGGER = logging.getLogger(__name__) @@ -87,12 +85,12 @@ class MipsMsgTypeOptions(Enum): class MipsMessage: """MIoT Pub/Sub message.""" mid: int = 0 - msg_from: str = None - ret_topic: str = None - payload: str = None + msg_from: str | None = None + ret_topic: str | None = None + payload: str | None = None @staticmethod - def unpack(data: bytes): + def unpack(data: bytes) -> 'MipsMessage': mips_msg = MipsMessage() data_len = len(data) data_start = 0 @@ -122,7 +120,10 @@ class MipsMessage: @staticmethod def pack( - mid: int, payload: str, msg_from: str = None, ret_topic: str = None + mid: int, + payload: str, + msg_from: str | None = None, + ret_topic: str | None = None ) -> bytes: if mid is None or payload is None: raise MIoTMipsError('invalid mid or payload') @@ -152,125 +153,48 @@ class MipsMessage: return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}' -class MipsCmdType(Enum): - """MIoT Pub/Sub command type.""" - CONNECT = 0 - DISCONNECT = auto() - DEINIT = auto() - SUB = auto() - UNSUB = auto() - CALL_API = auto() - REG_BROADCAST = auto() - UNREG_BROADCAST = auto() - - REG_MIPS_STATE = auto() - UNREG_MIPS_STATE = auto() - REG_DEVICE_STATE = auto() - UNREG_DEVICE_STATE = auto() - - -@dataclass -class MipsCmd: - """MIoT Pub/Sub command.""" - type_: MipsCmdType - data: Any - - def __init__(self, type_: MipsCmdType, data: Any) -> None: - self.type_ = type_ - self.data = data - - @dataclass class MipsRequest: """MIoT Pub/Sub request.""" - mid: int = None - on_reply: Callable[[str, Any], None] = None - on_reply_ctx: Any = None - timer: TimeoutHandle = None - - -@dataclass -class MipsRequestData: - """MIoT Pub/Sub request data.""" - topic: str = None - payload: str = None - on_reply: Callable[[str, Any], None] = None - on_reply_ctx: Any = None - timeout_ms: int = None - - -@dataclass -class MipsSendBroadcastData: - """MIoT Pub/Sub send broadcast data.""" - topic: str = None - payload: str = None + mid: int + on_reply: Callable[[str, Any], None] + on_reply_ctx: Any + timer: asyncio.TimerHandle | None @dataclass class MipsIncomingApiCall: """MIoT Pub/Sub incoming API call.""" - mid: int = None - ret_topic: str = None - timer: TimeoutHandle = None - - -@dataclass -class MipsApi: - """MIoT Pub/Sub API.""" - topic: str = None - """ - param1: session - param2: payload - param3: handler_ctx - """ - handler: Callable[[MipsIncomingApiCall, str, Any], None] = None - handler_ctx: Any = None - - -class MipsRegApi(MipsApi): - """.MIoT Pub/Sub register API.""" - - -@dataclass -class MipsReplyData: - """MIoT Pub/Sub reply data.""" - session: MipsIncomingApiCall = None - payload: str = None + mid: int | None = None + ret_topic: str | None = None + timer: asyncio.TimerHandle | None = None @dataclass class MipsBroadcast: """MIoT Pub/Sub broadcast.""" - topic: str = None + topic: str """ param 1: msg topic param 2: msg payload param 3: handle_ctx """ - handler: Callable[[str, str, Any], None] = None - handler_ctx: Any = None + handler: Callable[[str, str, Any], None] + handler_ctx: Any def __str__(self) -> str: return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}' -class MipsRegBroadcast(MipsBroadcast): - """MIoT Pub/Sub register broadcast.""" - - @dataclass class MipsState: """MIoT Pub/Sub state.""" - key: str = None + key: str """ str: key bool: mips connect state """ - handler: Callable[[str, bool], asyncio.Future] = None - - -class MipsRegState(MipsState): - """MIoT Pub/Sub register state.""" + handler: Callable[[str, bool], Coroutine] class MIoTDeviceState(Enum): @@ -280,72 +204,54 @@ class MIoTDeviceState(Enum): ONLINE = auto() -@dataclass -class MipsDeviceState: - """MIoT Pub/Sub device state.""" - did: str = None - """handler - str: did - MIoTDeviceState: online/offline/disable - Any: ctx - """ - handler: Callable[[str, MIoTDeviceState, Any], None] = None - handler_ctx: Any = None - - -class MipsRegDeviceState(MipsDeviceState): - """MIoT Pub/Sub register device state.""" - - class MipsClient(ABC): """MIoT Pub/Sub client.""" # pylint: disable=unused-argument - MQTT_INTERVAL_MS = 1000 + MQTT_INTERVAL_S = 1 MIPS_QOS: int = 2 UINT32_MAX: int = 0xFFFFFFFF - MIPS_RECONNECT_INTERVAL_MIN: int = 30000 - MIPS_RECONNECT_INTERVAL_MAX: int = 600000 + MIPS_RECONNECT_INTERVAL_MIN: float = 30 + MIPS_RECONNECT_INTERVAL_MAX: float = 600 MIPS_SUB_PATCH: int = 300 - MIPS_SUB_INTERVAL: int = 1000 + MIPS_SUB_INTERVAL: float = 1 main_loop: asyncio.AbstractEventLoop - _logger: logging.Logger + _logger: logging.Logger | None _client_id: str _host: str _port: int - _username: str - _password: str - _ca_file: str - _cert_file: str - _key_file: str + _username: str | None + _password: str | None + _ca_file: str | None + _cert_file: str | None + _key_file: str | None - _mqtt_logger: logging.Logger + _mqtt_logger: logging.Logger | None _mqtt: Client _mqtt_fd: int - _mqtt_timer: TimeoutHandle + _mqtt_timer: asyncio.TimerHandle | None _mqtt_state: bool _event_connect: asyncio.Event _event_disconnect: asyncio.Event - _mev: MIoTEventLoop + _internal_loop: asyncio.AbstractEventLoop _mips_thread: threading.Thread - _mips_queue: queue.Queue - _cmd_event_fd: os.eventfd _mips_reconnect_tag: bool - _mips_reconnect_interval: int - _mips_reconnect_timer: Optional[TimeoutHandle] + _mips_reconnect_interval: float + _mips_reconnect_timer: Optional[asyncio.TimerHandle] _mips_state_sub_map: dict[str, MipsState] _mips_sub_pending_map: dict[str, int] - _mips_sub_pending_timer: Optional[TimeoutHandle] - - _on_mips_cmd: Callable[[MipsCmd], None] - _on_mips_message: Callable[[str, bytes], None] - _on_mips_connect: Callable[[int, dict], None] - _on_mips_disconnect: Callable[[int, dict], None] + _mips_sub_pending_timer: Optional[asyncio.TimerHandle] def __init__( - self, client_id: str, host: str, port: int, - username: str = None, password: str = None, - ca_file: str = None, cert_file: str = None, key_file: str = None, + self, + client_id: + str, host: str, + port: int, + username: Optional[str] = None, + password: Optional[str] = None, + ca_file: Optional[str] = None, + cert_file: Optional[str] = None, + key_file: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: # MUST run with running loop @@ -378,20 +284,8 @@ class MipsClient(ABC): self._mips_state_sub_map = {} self._mips_sub_pending_map = {} self._mips_sub_pending_timer = None - self._mev = MIoTEventLoop() - self._mips_queue = queue.Queue() - self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) - self.mev_set_read_handler( - self._cmd_event_fd, self.__mips_cmd_read_handler, None) - self._mips_thread = threading.Thread(target=self.__mips_loop_thread) - self._mips_thread.daemon = True - self._mips_thread.name = self._client_id - self._mips_thread.start() + # DO NOT start the thread yet. Do that on connect - self._on_mips_cmd = None - self._on_mips_message = None - self._on_mips_connect = None - self._on_mips_disconnect = None @property def client_id(self) -> str: @@ -416,28 +310,37 @@ class MipsClient(ABC): return self._mqtt and self._mqtt.is_connected() @final - def mips_deinit(self) -> None: - self._mips_send_cmd(type_=MipsCmdType.DEINIT, data=None) + def connect(self) -> None: + """mips connect.""" + # TODO: make this more precise + # Mark as not closed, though also not connected yet + self._is_closed = False + # Start mips thread + self._internal_loop = asyncio.new_event_loop() + self._mips_thread = threading.Thread(target=self.__mips_loop_thread) + self._mips_thread.daemon = True + self._mips_thread.name = self._client_id + self._mips_thread.start() + + @final + def close(self) -> None: + self._is_connected = False + + self._internal_loop.call_soon_threadsafe(self.__mips_close) self._mips_thread.join() - self._mips_thread = None + self._internal_loop.close() self._logger = None - self._client_id = None - self._host = None - self._port = None self._username = None self._password = None self._ca_file = None self._cert_file = None self._key_file = None self._mqtt_logger = None - self._mips_state_sub_map = None - self._mips_sub_pending_map = None + self._mips_state_sub_map.clear() + self._mips_sub_pending_map.clear() self._mips_sub_pending_timer = None - self._event_connect = None - self._event_disconnect = None - def update_mqtt_password(self, password: str) -> None: self._password = password self._mqtt.username_pw_set( @@ -466,166 +369,85 @@ class MipsClient(ABC): else: self._mqtt.disable_logger() - @final - def mips_connect(self) -> None: - """mips connect.""" - return self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) - @final async def mips_connect_async(self) -> None: """mips connect async.""" - self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None) - return await self._event_connect.wait() + self.connect() + await self._event_connect.wait() @final def mips_disconnect(self) -> None: """mips disconnect.""" - return self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) + self._internal_loop.call_soon_threadsafe(self.__mips_disconnect) @final async def mips_disconnect_async(self) -> None: """mips disconnect async.""" - self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None) - return await self._event_disconnect.wait() + self.mips_disconnect() + await self._event_disconnect.wait() @final def sub_mips_state( - self, key: str, handler: Callable[[str, bool], asyncio.Future] + self, key: str, handler: Callable[[str, bool], Coroutine] ) -> bool: """Subscribe mips state. NOTICE: callback to main loop thread """ if isinstance(key, str) is False or handler is None: raise MIoTMipsError('invalid params') - return self._mips_send_cmd( - type_=MipsCmdType.REG_MIPS_STATE, - data=MipsRegState(key=key, handler=handler)) + self._internal_loop.call_soon_threadsafe( + self.__sub_mips_state, key, handler) + return True @final def unsub_mips_state(self, key: str) -> bool: """Unsubscribe mips state.""" if isinstance(key, str) is False: raise MIoTMipsError('invalid params') - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_MIPS_STATE, data=MipsRegState(key=key)) - - @final - def mev_set_timeout( - self, timeout_ms: int, handler: Callable[[Any], None], - handler_ctx: Any = None - ) -> Optional[TimeoutHandle]: - """set timeout. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return None - return self._mev.set_timeout( - timeout_ms=timeout_ms, handler=handler, handler_ctx=handler_ctx) - - @final - def mev_clear_timeout(self, handle: TimeoutHandle) -> None: - """clear timeout. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return - self._mev.clear_timeout(handle) - - @final - def mev_set_read_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any - ) -> bool: - """set read handler. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return False - return self._mev.set_read_handler( - fd=fd, handler=handler, handler_ctx=handler_ctx) - - @final - def mev_set_write_handler( - self, fd: int, handler: Callable[[Any], None], handler_ctx: Any - ) -> bool: - """set write handler. - NOTICE: Internal function, only mips threads are allowed to call - """ - if self._mev is None: - return False - return self._mev.set_write_handler( - fd=fd, handler=handler, handler_ctx=handler_ctx) - - @property - def on_mips_cmd(self) -> Callable[[MipsCmd], None]: - return self._on_mips_cmd - - @on_mips_cmd.setter - def on_mips_cmd(self, handler: Callable[[MipsCmd], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the **mips** thread - """ - self._on_mips_cmd = handler - - @property - def on_mips_message(self) -> Callable[[str, bytes], None]: - return self._on_mips_message - - @on_mips_message.setter - def on_mips_message(self, handler: Callable[[str, bytes], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the **mips** thread - """ - self._on_mips_message = handler - - @property - def on_mips_connect(self) -> Callable[[int, dict], None]: - return self._on_mips_connect - - @on_mips_connect.setter - def on_mips_connect(self, handler: Callable[[int, dict], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the - **main loop** thread - """ - self._on_mips_connect = handler - - @property - def on_mips_disconnect(self) -> Callable[[int, dict], None]: - return self._on_mips_disconnect - - @on_mips_disconnect.setter - def on_mips_disconnect(self, handler: Callable[[int, dict], None]) -> None: - """MUST set after __init__ done. - NOTICE thread safe, this function will be called at the - **main loop** thread - """ - self._on_mips_disconnect = handler + self._internal_loop.call_soon_threadsafe(self.__unsub_mips_state, key) + return True @abstractmethod def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_prop( - self, did: str, siid: int = None, piid: int = None + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None ) -> bool: ... @abstractmethod def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: ... @abstractmethod def unsub_event( - self, did: str, siid: int = None, eiid: int = None + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None ) -> bool: ... @abstractmethod async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, + payload: Optional[str] = None, + timeout_ms: int = 10000 ) -> dict[str, dict]: ... @abstractmethod @@ -637,13 +459,22 @@ class MipsClient(ABC): async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 - ) -> bool: ... + ) -> dict: ... @abstractmethod async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 - ) -> tuple[bool, list]: ... + ) -> dict: ... + + @abstractmethod + def _on_mips_message(self, topic: str, payload: bytes) -> None:... + + @abstractmethod + def _on_mips_connect(self, rc: int, props: dict) -> None:... + + @abstractmethod + def _on_mips_disconnect(self, rc: int, props: dict) -> None:... @final def _mips_sub_internal(self, topic: str) -> None: @@ -657,8 +488,8 @@ class MipsClient(ABC): if topic not in self._mips_sub_pending_map: self._mips_sub_pending_map[topic] = 0 if not self._mips_sub_pending_timer: - self._mips_sub_pending_timer = self.mev_set_timeout( - 10, self.__mips_sub_internal_pending_handler, topic) + self._mips_sub_pending_timer = self._internal_loop.call_later( + 0.01, self.__mips_sub_internal_pending_handler, topic) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'mips sub internal error, {topic}. {err}') @@ -707,75 +538,24 @@ class MipsClient(ABC): self.log_error(f'mips publish internal error, {err}') return False - @final - def _mips_send_cmd(self, type_: MipsCmdType, data: Any) -> bool: - if self._mips_queue is None or self._cmd_event_fd is None: - raise MIoTMipsError('send mips cmd disable') - # Put data to queue - self._mips_queue.put(MipsCmd(type_=type_, data=data)) - # Write event fd - os.eventfd_write(self._cmd_event_fd, 1) - # self.log_debug(f'send mips cmd, {type}, {data}') - return True - def __thread_check(self) -> None: if threading.current_thread() is not self._mips_thread: raise MIoTMipsError('illegal call') - def __mips_cmd_read_handler(self, ctx: Any) -> None: - fd_value = os.eventfd_read(self._cmd_event_fd) - if fd_value == 0: - return - while self._mips_queue.empty() is False: - mips_cmd: MipsCmd = self._mips_queue.get(block=False) - if mips_cmd.type_ == MipsCmdType.CONNECT: - self._mips_reconnect_tag = True - self.__mips_try_reconnect(immediately=True) - elif mips_cmd.type_ == MipsCmdType.DISCONNECT: - self._mips_reconnect_tag = False - self.__mips_disconnect() - elif mips_cmd.type_ == MipsCmdType.DEINIT: - self.log_info('mips client recv deinit cmd') - self.__mips_disconnect() - # Close cmd event fd - if self._cmd_event_fd: - self.mev_set_read_handler( - self._cmd_event_fd, None, None) - os.close(self._cmd_event_fd) - self._cmd_event_fd = None - if self._mips_queue: - self._mips_queue = None - # ev loop stop - if self._mev: - self._mev.loop_stop() - self._mev = None - break - elif mips_cmd.type_ == MipsCmdType.REG_MIPS_STATE: - state: MipsState = mips_cmd.data - self._mips_state_sub_map[state.key] = state - self.log_debug(f'mips register mips state, {state.key}') - elif mips_cmd.type_ == MipsCmdType.UNREG_MIPS_STATE: - state: MipsState = mips_cmd.data - del self._mips_state_sub_map[state.key] - self.log_debug(f'mips unregister mips state, {state.key}') - else: - if self._on_mips_cmd: - self._on_mips_cmd(mips_cmd=mips_cmd) + def __mqtt_read_handler(self) -> None: + self.__mqtt_loop_handler() - def __mqtt_read_handler(self, ctx: Any) -> None: - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_write_handler(self) -> None: + self._internal_loop.remove_writer(self._mqtt_fd) + self.__mqtt_loop_handler() - def __mqtt_write_handler(self, ctx: Any) -> None: - self.mev_set_write_handler(self._mqtt_fd, None, None) - self.__mqtt_loop_handler(ctx=ctx) - - def __mqtt_timer_handler(self, ctx: Any) -> None: - self.__mqtt_loop_handler(ctx=ctx) + def __mqtt_timer_handler(self) -> None: + self.__mqtt_loop_handler() if self._mqtt: - self._mqtt_timer = self.mev_set_timeout( - self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) + self._mqtt_timer = self._internal_loop.call_later( + self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) - def __mqtt_loop_handler(self, ctx: Any) -> None: + def __mqtt_loop_handler(self) -> None: try: if self._mqtt: self._mqtt.loop_read() @@ -784,8 +564,8 @@ class MipsClient(ABC): if self._mqtt: self._mqtt.loop_misc() if self._mqtt and self._mqtt.want_write(): - self.mev_set_write_handler( - self._mqtt_fd, self.__mqtt_write_handler, None) + self._internal_loop.add_writer( + self._mqtt_fd, self.__mqtt_write_handler) except Exception as err: # pylint: disable=broad-exception-caught # Catch all exception self.log_error(f'__mqtt_loop_handler, {err}') @@ -814,8 +594,10 @@ class MipsClient(ABC): self._mqtt.on_connect_fail = self.__on_connect_failed self._mqtt.on_disconnect = self.__on_disconnect self._mqtt.on_message = self.__on_message + # Connect to mips + self.__mips_start_connect_tries() # Run event loop - self._mev.loop_forever() + self._internal_loop.run_forever() self.log_info('mips_loop_thread exit!') def __on_connect(self, client, user_data, flags, rc, props) -> None: @@ -824,10 +606,8 @@ class MipsClient(ABC): self.log_info(f'mips connect, {flags}, {rc}, {props}') self._mqtt_state = True if self._on_mips_connect: - self.mev_set_timeout( - timeout_ms=0, - handler=lambda ctx: - self._on_mips_connect(rc, props)) + self._internal_loop.call_soon( + self._on_mips_connect, rc, props) for item in self._mips_state_sub_map.values(): if item.handler is None: continue @@ -838,8 +618,8 @@ class MipsClient(ABC): self._event_connect.set() self._event_disconnect.clear() - def __on_connect_failed(self, client, user_data, flags, rc) -> None: - self.log_error(f'mips connect failed, {flags}, {rc}') + def __on_connect_failed(self, client:Client, user_data:Any) -> None: + self.log_error(f'mips connect failed') # Try to reconnect self.__mips_try_reconnect() @@ -848,22 +628,20 @@ class MipsClient(ABC): self.log_error(f'mips disconnect, {rc}, {props}') self._mqtt_state = False if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 # Clear retry sub if self._mips_sub_pending_timer: - self.mev_clear_timeout(self._mips_sub_pending_timer) + self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} if self._on_mips_disconnect: - self.mev_set_timeout( - timeout_ms=0, - handler=lambda ctx: - self._on_mips_disconnect(rc, props)) + self._internal_loop.call_soon( + self._on_mips_disconnect, rc, props) # Call state sub handler for item in self._mips_state_sub_map.values(): if item.handler is None: @@ -878,23 +656,9 @@ class MipsClient(ABC): self._event_disconnect.set() self._event_connect.clear() - def __on_message(self, client, user_data, msg) -> None: + def __on_message(self, client:Client, user_data:Any , msg:MQTTMessage) -> None: self._on_mips_message(topic=msg.topic, payload=msg.payload) - def __mips_try_reconnect(self, immediately: bool = False) -> None: - if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) - self._mips_reconnect_timer = None - if not self._mips_reconnect_tag: - return - interval: int = 0 - if not immediately: - interval = self.__get_next_reconnect_time() - self.log_error( - 'mips try reconnect after %sms', interval) - self._mips_reconnect_timer = self.mev_set_timeout( - interval, self.__mips_connect, None) - def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: subbed_count = 1 for topic in list(self._mips_sub_pending_map.keys()): @@ -916,25 +680,25 @@ class MipsClient(ABC): f'retry mips sub internal, {count}, {topic}, {result}, {mid}') if len(self._mips_sub_pending_map): - self._mips_sub_pending_timer = self.mev_set_timeout( + self._mips_sub_pending_timer = self._internal_loop.call_later( self.MIPS_SUB_INTERVAL, self.__mips_sub_internal_pending_handler, None) else: self._mips_sub_pending_timer = None - def __mips_connect(self, ctx: Any = None) -> None: + def __mips_connect(self) -> None: result = MQTT_ERR_UNKNOWN if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) + self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None try: # Try clean mqtt fd before mqtt connect if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 result = self._mqtt.connect( host=self._host, port=self._port, @@ -944,33 +708,73 @@ class MipsClient(ABC): self.log_error('__mips_connect, connect error, %s', error) if result == MQTT_ERR_SUCCESS: - self._mqtt_fd = self._mqtt.socket() + socket = self._mqtt.socket() + if socket is None: + self.log_error('__mips_connect, connect success, but socket is None') + self.__mips_try_reconnect() + return + self._mqtt_fd = socket.fileno() self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}') - self.mev_set_read_handler( - self._mqtt_fd, self.__mqtt_read_handler, None) + self._internal_loop.add_reader( + self._mqtt_fd, self.__mqtt_read_handler) if self._mqtt.want_write(): - self.mev_set_write_handler( - self._mqtt_fd, self.__mqtt_write_handler, None) - self._mqtt_timer = self.mev_set_timeout( - self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None) + self._internal_loop.add_writer( + self._mqtt_fd, self.__mqtt_write_handler) + self._mqtt_timer = self._internal_loop.call_later( + self.MQTT_INTERVAL_S, self.__mqtt_timer_handler) else: self.log_error(f'__mips_connect error result, {result}') self.__mips_try_reconnect() + + def __mips_try_reconnect(self, immediately: bool = False) -> None: + if self._mips_reconnect_timer: + self._mips_reconnect_timer.cancel() + self._mips_reconnect_timer = None + if not self._mips_reconnect_tag: + return + interval: float = 0 + if not immediately: + interval = self.__get_next_reconnect_time() + self.log_error( + 'mips try reconnect after %ss', interval) + self._mips_reconnect_timer = self._internal_loop.call_later( + interval, self.__mips_connect) + + def __mips_start_connect_tries(self) -> None: + self._mips_reconnect_tag = True + self.__mips_try_reconnect(immediately=True) def __mips_disconnect(self) -> None: + self._mips_reconnect_tag = False if self._mips_reconnect_timer: - self.mev_clear_timeout(self._mips_reconnect_timer) + self._mips_reconnect_timer.cancel() self._mips_reconnect_timer = None if self._mqtt_timer: - self.mev_clear_timeout(self._mqtt_timer) + self._mqtt_timer.cancel() self._mqtt_timer = None if self._mqtt_fd != -1: - self.mev_set_read_handler(self._mqtt_fd, None, None) - self.mev_set_write_handler(self._mqtt_fd, None, None) + self._internal_loop.remove_reader(self._mqtt_fd) + self._internal_loop.remove_writer(self._mqtt_fd) self._mqtt_fd = -1 self._mqtt.disconnect() - def __get_next_reconnect_time(self) -> int: + def __mips_close(self) -> None: + self.log_info('mips client closing') + self.__mips_disconnect() + self._internal_loop.stop() + + def __sub_mips_state( + self, key: str, handler: Callable[[str, bool], Coroutine] + ) -> None: + state = MipsState(key=key, handler=handler) + self._mips_state_sub_map[key] = state + self.log_debug(f'mips register mips state, {key}') + + def __unsub_mips_state(self, key: str) -> None: + del self._mips_state_sub_map[key] + self.log_debug(f'mips unregister mips state, {key}') + + def __get_next_reconnect_time(self) -> float: if self._mips_reconnect_interval == 0: self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN else: @@ -996,22 +800,6 @@ class MipsCloudClient(MipsClient): client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com', port=port, username=app_id, password=token, loop=loop) - self.on_mips_cmd = self.__on_mips_cmd_handler - self.on_mips_message = self.__on_mips_message_handler - self.on_mips_connect = self.__on_mips_connect_handler - self.on_mips_disconnect = self.__on_mips_disconnect_handler - - def deinit(self) -> None: - self.mips_deinit() - self._msg_matcher = None - self.on_mips_cmd = None - self.on_mips_message = None - self.on_mips_connect = None - - @final - def connect(self) -> None: - self.mips_connect() - @final async def connect_async(self) -> None: await self.mips_connect_async() @@ -1029,12 +817,17 @@ class MipsCloudClient(MipsClient): def update_access_token(self, access_token: str) -> bool: if not isinstance(access_token, str): raise MIoTMipsError('invalid token') - return self.update_mqtt_password(password=access_token) + self.update_mqtt_password(password=access_token) + return True @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') @@ -1043,7 +836,7 @@ class MipsCloudClient(MipsClient): f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - def on_prop_msg(topic: str, payload: str, ctx: Any) -> bool: + def on_prop_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: @@ -1062,22 +855,31 @@ class MipsCloudClient(MipsClient): if handler: self.log_debug('on properties_changed, %s', payload) handler(msg['params'], ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = ( f'device/{did}/up/properties_changed/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: if not isinstance(did, str) or handler is None: raise MIoTMipsError('invalid params') @@ -1086,7 +888,7 @@ class MipsCloudClient(MipsClient): f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - def on_event_msg(topic: str, payload: str, ctx: Any) -> bool: + def on_event_msg(topic: str, payload: str, ctx: Any) -> None: try: msg: dict = json.loads(payload) except json.JSONDecodeError: @@ -1106,18 +908,23 @@ class MipsCloudClient(MipsClient): self.log_debug('on on_event_msg, %s', payload) msg['params']['from'] = 'cloud' handler(msg['params'], ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: if not isinstance(did, str): raise MIoTMipsError('invalid params') # Spelling error: event_occured topic: str = ( f'device/{did}/up/event_occured/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_device_state( @@ -1145,7 +952,7 @@ class MipsCloudClient(MipsClient): handler( did, MIoTDeviceState.ONLINE if msg['event'] == 'online' else MIoTDeviceState.OFFLINE, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_state_msg, handler_ctx=handler_ctx) @final @@ -1153,10 +960,10 @@ class MipsCloudClient(MipsClient): if not isinstance(did, str): raise MIoTMipsError('invalid params') topic: str = f'device/{did}/state/#' - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: raise NotImplementedError('please call in http client') @@ -1168,60 +975,57 @@ class MipsCloudClient(MipsClient): async def set_prop_async( self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 - ) -> bool: + ) -> dict: raise NotImplementedError('please call in http client') async def action_async( self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000 - ) -> tuple[bool, list]: + ) -> dict: raise NotImplementedError('please call in http client') - def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: - """ - NOTICE thread safe, this function will be called at the **mips** thread - """ - if mips_cmd.type_ == MipsCmdType.REG_BROADCAST: - reg_bc: MipsRegBroadcast = mips_cmd.data - if not self._msg_matcher.get(topic=reg_bc.topic): - sub_bc: MipsBroadcast = MipsBroadcast( - topic=reg_bc.topic, handler=reg_bc.handler, - handler_ctx=reg_bc.handler_ctx) - self._msg_matcher[reg_bc.topic] = sub_bc - self._mips_sub_internal(topic=reg_bc.topic) - else: - self.log_debug(f'mips cloud re-reg broadcast, {reg_bc.topic}') - elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: - unreg_bc: MipsRegBroadcast = mips_cmd.data - if self._msg_matcher.get(topic=unreg_bc.topic): - del self._msg_matcher[unreg_bc.topic] - self._mips_unsub_internal(topic=unreg_bc.topic) + def __reg_broadcast_external( + self, topic: str, handler: Callable[[str, str, Any], None], + handler_ctx: Any = None + ) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__reg_broadcast, topic, handler, handler_ctx) + return True + + def __unreg_broadcast_external(self, topic: str) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__unreg_broadcast, topic) + return True def __reg_broadcast( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any = None - ) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.REG_BROADCAST, - data=MipsRegBroadcast( - topic=topic, handler=handler, handler_ctx=handler_ctx)) + ) -> None: + if not self._msg_matcher.get(topic=topic): + sub_bc: MipsBroadcast = MipsBroadcast( + topic=topic, handler=handler, + handler_ctx=handler_ctx) + self._msg_matcher[topic] = sub_bc + self._mips_sub_internal(topic=topic) + else: + self.log_debug(f'mips cloud re-reg broadcast, {topic}') + + def __unreg_broadcast(self, topic: str) -> None: + if self._msg_matcher.get(topic=topic): + del self._msg_matcher[topic] + self._mips_unsub_internal(topic=topic) - def __unreg_broadcast(self, topic: str) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_BROADCAST, - data=MipsRegBroadcast(topic=topic)) - - def __on_mips_connect_handler(self, rc, props) -> None: + def _on_mips_connect(self, rc: int, props: dict) -> None: """sub topic.""" for topic, _ in list( self._msg_matcher.iter_all_nodes()): self._mips_sub_internal(topic=topic) - def __on_mips_disconnect_handler(self, rc, props) -> None: + def _on_mips_disconnect(self, rc: int, props: dict) -> None: """unsub topic.""" pass - def __on_mips_message_handler(self, topic: str, payload) -> None: + def _on_mips_message(self, topic: str, payload: bytes) -> None: """ NOTICE thread safe, this function will be called at the **mips** thread """ @@ -1230,23 +1034,25 @@ class MipsCloudClient(MipsClient): self._msg_matcher.iter_match(topic)) if not bc_list: return + # The message from the cloud is not packed. + payload_str:str = payload.decode('utf-8') # self.log_debug(f"on broadcast, {topic}, {payload}") for item in bc_list or []: if item.handler is None: continue # NOTICE: call threadsafe self.main_loop.call_soon_threadsafe( - item.handler, topic, payload, item.handler_ctx) + item.handler, topic, payload_str, item.handler_ctx) class MipsLocalClient(MipsClient): """MIoT Pub/Sub Local Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes - MIPS_RECONNECT_INTERVAL_MIN: int = 6000 - MIPS_RECONNECT_INTERVAL_MAX: int = 60000 + MIPS_RECONNECT_INTERVAL_MIN: float = 6 + MIPS_RECONNECT_INTERVAL_MAX: float = 60 MIPS_SUB_PATCH: int = 1000 - MIPS_SUB_INTERVAL: int = 100 + MIPS_SUB_INTERVAL: float = 0.1 _did: str _group_id: str _home_name: str @@ -1255,10 +1061,9 @@ class MipsLocalClient(MipsClient): _dev_list_change_topic: str _request_map: dict[str, MipsRequest] _msg_matcher: MIoTMatcher - _device_state_sub_map: dict[str, MipsDeviceState] _get_prop_queue: dict[str, list] - _get_prop_timer: asyncio.TimerHandle - _on_dev_list_changed: Callable[[Any, list[str]], asyncio.Future] + _get_prop_timer: asyncio.TimerHandle | None + _on_dev_list_changed: Callable[[Any, list[str]], Coroutine] | None def __init__( self, did: str, host: str, group_id: str, @@ -1274,7 +1079,6 @@ class MipsLocalClient(MipsClient): self._dev_list_change_topic = f'{did}/appMsg/devListChange' self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} self._get_prop_queue = {} self._get_prop_timer = None self._on_dev_list_changed = None @@ -1285,31 +1089,10 @@ class MipsLocalClient(MipsClient): # MIPS local thread name use group_id self._mips_thread.name = self._group_id - self.on_mips_cmd = self.__on_mips_cmd_handler - self.on_mips_message = self.__on_mips_message_handler - self.on_mips_connect = self.__on_mips_connect_handler - @property def group_id(self) -> str: return self._group_id - def deinit(self) -> None: - self.mips_deinit() - self._did = None - self._mips_seed_id = None - self._reply_topic = None - self._dev_list_change_topic = None - self._request_map = None - self._msg_matcher = None - self._device_state_sub_map = None - self._get_prop_queue = None - self._get_prop_timer = None - self._on_dev_list_changed = None - - self.on_mips_cmd = None - self.on_mips_message = None - self.on_mips_connect = None - def log_debug(self, msg, *args, **kwargs) -> None: if self._logger: self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs) @@ -1322,10 +1105,6 @@ class MipsLocalClient(MipsClient): if self._logger: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) - @final - def connect(self) -> None: - self.mips_connect() - @final async def connect_async(self) -> None: await self.mips_connect_async() @@ -1335,19 +1114,21 @@ class MipsLocalClient(MipsClient): self.mips_disconnect() self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} @final async def disconnect_async(self) -> None: await self.mips_disconnect_async() self._request_map = {} self._msg_matcher = MIoTMatcher() - self._device_state_sub_map = {} @final def sub_prop( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, piid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + piid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' @@ -1367,20 +1148,29 @@ class MipsLocalClient(MipsClient): if handler: self.log_debug('local, on properties_changed, %s', payload) handler(msg, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop( + self, + did: str, + siid: Optional[int] = None, + piid: Optional[int] = None + ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/property/' f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final def sub_event( - self, did: str, handler: Callable[[dict, Any], None], - siid: int = None, eiid: int = None, handler_ctx: Any = None + self, + did: str, + handler: Callable[[dict, Any], None], + siid: Optional[int] = None, + eiid: Optional[int] = None, + handler_ctx: Any = None ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' @@ -1400,15 +1190,20 @@ class MipsLocalClient(MipsClient): if handler: self.log_debug('local, on event_occurred, %s', payload) handler(msg, ctx) - return self.__reg_broadcast( + return self.__reg_broadcast_external( topic=topic, handler=on_event_msg, handler_ctx=handler_ctx) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, + did: str, + siid: Optional[int] = None, + eiid: Optional[int] = None + ) -> bool: topic: str = ( f'appMsg/notify/iot/{did}/event/' f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}') - return self.__unreg_broadcast(topic=topic) + return self.__unreg_broadcast_external(topic=topic) @final async def get_prop_safe_async( @@ -1426,7 +1221,9 @@ class MipsLocalClient(MipsClient): 'timeout_ms': timeout_ms }) if self._get_prop_timer is None: - self._get_prop_timer = self.main_loop.create_task( + self._get_prop_timer = self.main_loop.call_later( + 0.1, + self.main_loop.create_task, self.__get_prop_timer_handle()) return await fut @@ -1515,13 +1312,13 @@ class MipsLocalClient(MipsClient): @final async def get_dev_list_async( - self, payload: str = None, timeout_ms: int = 10000 + self, payload: Optional[str] = None, timeout_ms: int = 10000 ) -> dict[str, dict]: result_obj = await self.__request_async( topic='proxy/getDevList', payload=payload or '{}', timeout_ms=timeout_ms) if not result_obj or 'devList' not in result_obj: - return None + raise MIoTMipsError('invalid result') device_list = {} for did, info in result_obj['devList'].items(): name: str = info.get('name', None) @@ -1557,7 +1354,7 @@ class MipsLocalClient(MipsClient): payload='{}', timeout_ms=timeout_ms) if not result_obj or 'result' not in result_obj: - return None + raise MIoTMipsError('invalid result') return result_obj['result'] @final @@ -1579,79 +1376,71 @@ class MipsLocalClient(MipsClient): @final @property - def on_dev_list_changed(self) -> Callable[[Any, list[str]], asyncio.Future]: + def on_dev_list_changed(self) -> Callable[[Any, list[str]], Coroutine] | None: return self._on_dev_list_changed @final @on_dev_list_changed.setter def on_dev_list_changed( - self, func: Callable[[Any, list[str]], asyncio.Future] + self, func: Callable[[Any, list[str]], Coroutine] ) -> None: """run in main loop.""" self._on_dev_list_changed = func - @final - def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None: - if mips_cmd.type_ == MipsCmdType.CALL_API: - req_data: MipsRequestData = mips_cmd.data - req = MipsRequest() - req.mid = self.__gen_mips_id - req.on_reply = req_data.on_reply - req.on_reply_ctx = req_data.on_reply_ctx - pub_topic: str = f'master/{req_data.topic}' - result = self.__mips_publish( - topic=pub_topic, payload=req_data.payload, mid=req.mid, - ret_topic=self._reply_topic) - self.log_debug( - f'mips local call api, {result}, {req.mid}, {pub_topic}, ' - f'{req_data.payload}') + def __request( + self, topic: str, payload: str, + on_reply: Callable[[str, Any], None], + on_reply_ctx: Any = None, timeout_ms: int = 10000 + ) -> None: + req = MipsRequest( + mid=self.__gen_mips_id, + on_reply=on_reply, + on_reply_ctx=on_reply_ctx, + timer=None) + pub_topic: str = f'master/{topic}' + result = self.__mips_publish( + topic=pub_topic, payload=payload, mid=req.mid, + ret_topic=self._reply_topic) + self.log_debug( + f'mips local call api, {result}, {req.mid}, {pub_topic}, ' + f'{payload}') - def on_request_timeout(req: MipsRequest): - self.log_error( - f'on mips request timeout, {req.mid}, {pub_topic}' - f', {req_data.payload}') - self._request_map.pop(str(req.mid), None) - req.on_reply( - '{"error":{"code":-10006, "message":"timeout"}}', - req.on_reply_ctx) - req.timer = self.mev_set_timeout( - req_data.timeout_ms, on_request_timeout, req) - self._request_map[str(req.mid)] = req - elif mips_cmd.type_ == MipsCmdType.REG_BROADCAST: - reg_bc: MipsRegBroadcast = mips_cmd.data - sub_topic: str = f'{self._did}/{reg_bc.topic}' - if not self._msg_matcher.get(sub_topic): - sub_bc: MipsBroadcast = MipsBroadcast( - topic=sub_topic, handler=reg_bc.handler, - handler_ctx=reg_bc.handler_ctx) - self._msg_matcher[sub_topic] = sub_bc - self._mips_sub_internal(topic=f'master/{reg_bc.topic}') - else: - self.log_debug(f'mips re-reg broadcast, {sub_topic}') - elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST: - unreg_bc: MipsRegBroadcast = mips_cmd.data - # Central hub gateway needs to add prefix - unsub_topic: str = f'{self._did}/{unreg_bc.topic}' - if self._msg_matcher.get(unsub_topic): - del self._msg_matcher[unsub_topic] - self._mips_unsub_internal( - topic=re.sub(f'^{self._did}', 'master', unsub_topic)) - elif mips_cmd.type_ == MipsCmdType.REG_DEVICE_STATE: - reg_dev_state: MipsRegDeviceState = mips_cmd.data - self._device_state_sub_map[reg_dev_state.did] = reg_dev_state - self.log_debug( - f'mips local reg device state, {reg_dev_state.did}') - elif mips_cmd.type_ == MipsCmdType.UNREG_DEVICE_STATE: - unreg_dev_state: MipsRegDeviceState = mips_cmd.data - del self._device_state_sub_map[unreg_dev_state.did] - self.log_debug( - f'mips local unreg device state, {unreg_dev_state.did}') - else: + def on_request_timeout(req: MipsRequest): self.log_error( - f'mips local recv unknown cmd, {mips_cmd.type_}, ' - f'{mips_cmd.data}') + f'on mips request timeout, {req.mid}, {pub_topic}' + f', {payload}') + self._request_map.pop(str(req.mid), None) + req.on_reply( + '{"error":{"code":-10006, "message":"timeout"}}', + req.on_reply_ctx) + req.timer = self._internal_loop.call_later( + timeout_ms/1000, on_request_timeout, req) + self._request_map[str(req.mid)] = req - def __on_mips_connect_handler(self, rc, props) -> None: + def __reg_broadcast( + self, topic: str, handler: Callable[[str, str, Any], None], + handler_ctx: Any + ) -> None: + sub_topic: str = f'{self._did}/{topic}' + if not self._msg_matcher.get(sub_topic): + sub_bc: MipsBroadcast = MipsBroadcast( + topic=sub_topic, handler=handler, + handler_ctx=handler_ctx) + self._msg_matcher[sub_topic] = sub_bc + self._mips_sub_internal(topic=f'master/{topic}') + else: + self.log_debug(f'mips re-reg broadcast, {sub_topic}') + + def __unreg_broadcast(self, topic) -> None: + # Central hub gateway needs to add prefix + unsub_topic: str = f'{self._did}/{topic}' + if self._msg_matcher.get(unsub_topic): + del self._msg_matcher[unsub_topic] + self._mips_unsub_internal( + topic=re.sub(f'^{self._did}', 'master', unsub_topic)) + + @final + def _on_mips_connect(self, rc: int, props: dict) -> None: self.log_debug('__on_mips_connect_handler') # Sub did/#, include reply topic self._mips_sub_internal(f'{self._did}/#') @@ -1665,17 +1454,18 @@ class MipsLocalClient(MipsClient): topic=re.sub(f'^{self._did}', 'master', topic)) @final - def __on_mips_message_handler(self, topic: str, payload: bytes) -> None: + def _on_mips_message(self, topic: str, payload: bytes) -> None: mips_msg: MipsMessage = MipsMessage.unpack(payload) # self.log_debug( # f"mips local client, on_message, {topic} -> {mips_msg}") # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') - req: MipsRequest = self._request_map.pop(str(mips_msg.mid), None) + req: MipsRequest | None = self._request_map.pop(str(mips_msg.mid), None) if req: # Cancel timer - self.mev_clear_timeout(req.timer) + if req.timer: + req.timer.cancel() if req.on_reply: self.main_loop.call_soon_threadsafe( req.on_reply, mips_msg.payload or '{}', @@ -1695,6 +1485,9 @@ class MipsLocalClient(MipsClient): return # Device list change if topic == self._dev_list_change_topic: + if mips_msg.payload is None: + self.log_error('devListChange msg is None') + return payload_obj: dict = json.loads(mips_msg.payload) dev_list = payload_obj.get('devList', None) if not isinstance(dev_list, list) or not dev_list: @@ -1704,7 +1497,7 @@ class MipsLocalClient(MipsClient): if self._on_dev_list_changed: self.main_loop.call_soon_threadsafe( self.main_loop.create_task, - self._on_dev_list_changed(self, payload_obj['devList'])) + self._on_dev_list_changed(self, dev_list)) return self.log_debug( @@ -1717,9 +1510,13 @@ class MipsLocalClient(MipsClient): return mips_id def __mips_publish( - self, topic: str, payload: str | bytes, mid: int = None, - ret_topic: str = None, wait_for_publish: bool = False, - timeout_ms: int = 10000 + self, + topic: str, + payload: str, + mid: Optional[int] = None, + ret_topic: Optional[str] = None, + wait_for_publish: bool = False, + timeout_ms: int = 10000 ) -> bool: mips_msg: bytes = MipsMessage.pack( mid=mid or self.__gen_mips_id, payload=payload, @@ -1728,34 +1525,30 @@ class MipsLocalClient(MipsClient): topic=topic.strip(), payload=mips_msg, wait_for_publish=wait_for_publish, timeout_ms=timeout_ms) - def __request( + def __request_external( self, topic: str, payload: str, on_reply: Callable[[str, Any], None], on_reply_ctx: Any = None, timeout_ms: int = 10000 ) -> bool: if topic is None or payload is None or on_reply is None: raise MIoTMipsError('invalid params') - req_data: MipsRequestData = MipsRequestData() - req_data.topic = topic - req_data.payload = payload - req_data.on_reply = on_reply - req_data.on_reply_ctx = on_reply_ctx - req_data.timeout_ms = timeout_ms - return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data) + self._internal_loop.call_soon_threadsafe( + self.__request, topic, payload, on_reply, on_reply_ctx, timeout_ms) + return True - def __reg_broadcast( + def __reg_broadcast_external( self, topic: str, handler: Callable[[str, str, Any], None], handler_ctx: Any ) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.REG_BROADCAST, - data=MipsRegBroadcast( - topic=topic, handler=handler, handler_ctx=handler_ctx)) + self._internal_loop.call_soon_threadsafe( + self.__reg_broadcast, + topic, handler, handler_ctx) + return True - def __unreg_broadcast(self, topic) -> bool: - return self._mips_send_cmd( - type_=MipsCmdType.UNREG_BROADCAST, - data=MipsRegBroadcast(topic=topic)) + def __unreg_broadcast_external(self, topic) -> bool: + self._internal_loop.call_soon_threadsafe( + self.__unreg_broadcast, topic) + return True @final async def __request_async( @@ -1767,7 +1560,7 @@ class MipsLocalClient(MipsClient): fut: asyncio.Future = ctx if fut: self.main_loop.call_soon_threadsafe(fut.set_result, payload) - if not self.__request( + if not self.__request_external( topic=topic, payload=payload, on_reply=on_msg_reply, From 8fcb4d895cf945f0d0eed740edfbafcb2cdf6c67 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Mon, 23 Dec 2024 16:22:44 +0800 Subject: [PATCH 2/6] lint fix --- .../xiaomi_home/miot/miot_mips.py | 45 ++++++++++++------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index 6c5eccd..a5e28fb 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -605,9 +605,8 @@ class MipsClient(ABC): return self.log_info(f'mips connect, {flags}, {rc}, {props}') self._mqtt_state = True - if self._on_mips_connect: - self._internal_loop.call_soon( - self._on_mips_connect, rc, props) + self._internal_loop.call_soon( + self._on_mips_connect, rc, props) for item in self._mips_state_sub_map.values(): if item.handler is None: continue @@ -619,7 +618,7 @@ class MipsClient(ABC): self._event_disconnect.clear() def __on_connect_failed(self, client:Client, user_data:Any) -> None: - self.log_error(f'mips connect failed') + self.log_error('mips connect failed') # Try to reconnect self.__mips_try_reconnect() @@ -639,9 +638,8 @@ class MipsClient(ABC): self._mips_sub_pending_timer.cancel() self._mips_sub_pending_timer = None self._mips_sub_pending_map = {} - if self._on_mips_disconnect: - self._internal_loop.call_soon( - self._on_mips_disconnect, rc, props) + self._internal_loop.call_soon( + self._on_mips_disconnect, rc, props) # Call state sub handler for item in self._mips_state_sub_map.values(): if item.handler is None: @@ -656,7 +654,12 @@ class MipsClient(ABC): self._event_disconnect.set() self._event_connect.clear() - def __on_message(self, client:Client, user_data:Any , msg:MQTTMessage) -> None: + def __on_message( + self, + client: Client, + user_data: Any, + msg: MQTTMessage + ) -> None: self._on_mips_message(topic=msg.topic, payload=msg.payload) def __mips_sub_internal_pending_handler(self, ctx: Any) -> None: @@ -710,7 +713,8 @@ class MipsClient(ABC): if result == MQTT_ERR_SUCCESS: socket = self._mqtt.socket() if socket is None: - self.log_error('__mips_connect, connect success, but socket is None') + self.log_error( + '__mips_connect, connect success, but socket is None') self.__mips_try_reconnect() return self._mqtt_fd = socket.fileno() @@ -725,7 +729,7 @@ class MipsClient(ABC): else: self.log_error(f'__mips_connect error result, {result}') self.__mips_try_reconnect() - + def __mips_try_reconnect(self, immediately: bool = False) -> None: if self._mips_reconnect_timer: self._mips_reconnect_timer.cancel() @@ -739,7 +743,7 @@ class MipsClient(ABC): 'mips try reconnect after %ss', interval) self._mips_reconnect_timer = self._internal_loop.call_later( interval, self.__mips_connect) - + def __mips_start_connect_tries(self) -> None: self._mips_reconnect_tag = True self.__mips_try_reconnect(immediately=True) @@ -769,7 +773,7 @@ class MipsClient(ABC): state = MipsState(key=key, handler=handler) self._mips_state_sub_map[key] = state self.log_debug(f'mips register mips state, {key}') - + def __unsub_mips_state(self, key: str) -> None: del self._mips_state_sub_map[key] self.log_debug(f'mips unregister mips state, {key}') @@ -1009,7 +1013,7 @@ class MipsCloudClient(MipsClient): self._mips_sub_internal(topic=topic) else: self.log_debug(f'mips cloud re-reg broadcast, {topic}') - + def __unreg_broadcast(self, topic: str) -> None: if self._msg_matcher.get(topic=topic): del self._msg_matcher[topic] @@ -1376,7 +1380,9 @@ class MipsLocalClient(MipsClient): @final @property - def on_dev_list_changed(self) -> Callable[[Any, list[str]], Coroutine] | None: + def on_dev_list_changed( + self + ) -> Callable[[Any, list[str]], Coroutine] | None: return self._on_dev_list_changed @final @@ -1430,7 +1436,7 @@ class MipsLocalClient(MipsClient): self._mips_sub_internal(topic=f'master/{topic}') else: self.log_debug(f'mips re-reg broadcast, {sub_topic}') - + def __unreg_broadcast(self, topic) -> None: # Central hub gateway needs to add prefix unsub_topic: str = f'{self._did}/{topic}' @@ -1451,7 +1457,11 @@ class MipsLocalClient(MipsClient): # Sub broadcast topic for topic, _ in list(self._msg_matcher.iter_all_nodes()): self._mips_sub_internal( - topic=re.sub(f'^{self._did}', 'master', topic)) + topic=re.sub(f'^{self._did}', 'master', topic))\ + + @final + def _on_mips_disconnect(self, rc: int, props: dict) -> None: + pass @final def _on_mips_message(self, topic: str, payload: bytes) -> None: @@ -1461,7 +1471,8 @@ class MipsLocalClient(MipsClient): # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') - req: MipsRequest | None = self._request_map.pop(str(mips_msg.mid), None) + req: MipsRequest | None = self._request_map.pop( + str(mips_msg.mid), None) if req: # Cancel timer if req.timer: From 43fd2bf42e6379735e0f4fa5eaf10c66d1070721 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Mon, 23 Dec 2024 16:26:25 +0800 Subject: [PATCH 3/6] make private classes private --- .../xiaomi_home/miot/miot_mips.py | 67 +++++++++---------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index a5e28fb..e4eb093 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -73,7 +73,7 @@ from .miot_error import MIoTErrorCode, MIoTMipsError _LOGGER = logging.getLogger(__name__) -class MipsMsgTypeOptions(Enum): +class _MipsMsgTypeOptions(Enum): """MIoT Pub/Sub message type.""" ID = 0 RET_TOPIC = auto() @@ -82,7 +82,7 @@ class MipsMsgTypeOptions(Enum): MAX = auto() -class MipsMessage: +class _MipsMessage: """MIoT Pub/Sub message.""" mid: int = 0 msg_from: str | None = None @@ -90,8 +90,8 @@ class MipsMessage: payload: str | None = None @staticmethod - def unpack(data: bytes) -> 'MipsMessage': - mips_msg = MipsMessage() + def unpack(data: bytes) -> '_MipsMessage': + mips_msg = _MipsMessage() data_len = len(data) data_start = 0 data_end = 0 @@ -102,15 +102,15 @@ class MipsMessage: unpack_data = data[data_end:data_end+unpack_len] # string end with \x00 match unpack_type: - case MipsMsgTypeOptions.ID.value: + case _MipsMsgTypeOptions.ID.value: mips_msg.mid = int.from_bytes( unpack_data, byteorder='little') - case MipsMsgTypeOptions.RET_TOPIC.value: + case _MipsMsgTypeOptions.RET_TOPIC.value: mips_msg.ret_topic = str( unpack_data.strip(b'\x00'), 'utf-8') - case MipsMsgTypeOptions.PAYLOAD.value: + case _MipsMsgTypeOptions.PAYLOAD.value: mips_msg.payload = str(unpack_data.strip(b'\x00'), 'utf-8') - case MipsMsgTypeOptions.FROM.value: + case _MipsMsgTypeOptions.FROM.value: mips_msg.msg_from = str( unpack_data.strip(b'\x00'), 'utf-8') case _: @@ -129,24 +129,24 @@ class MipsMessage: raise MIoTMipsError('invalid mid or payload') pack_msg: bytes = b'' # mid - pack_msg += struct.pack(' str: @@ -154,7 +154,7 @@ class MipsMessage: @dataclass -class MipsRequest: +class _MipsRequest: """MIoT Pub/Sub request.""" mid: int on_reply: Callable[[str, Any], None] @@ -162,16 +162,9 @@ class MipsRequest: timer: asyncio.TimerHandle | None -@dataclass -class MipsIncomingApiCall: - """MIoT Pub/Sub incoming API call.""" - mid: int | None = None - ret_topic: str | None = None - timer: asyncio.TimerHandle | None = None - @dataclass -class MipsBroadcast: +class _MipsBroadcast: """MIoT Pub/Sub broadcast.""" topic: str """ @@ -187,7 +180,7 @@ class MipsBroadcast: @dataclass -class MipsState: +class _MipsState: """MIoT Pub/Sub state.""" key: str """ @@ -204,7 +197,7 @@ class MIoTDeviceState(Enum): ONLINE = auto() -class MipsClient(ABC): +class _MipsClient(ABC): """MIoT Pub/Sub client.""" # pylint: disable=unused-argument MQTT_INTERVAL_S = 1 @@ -238,7 +231,7 @@ class MipsClient(ABC): _mips_reconnect_tag: bool _mips_reconnect_interval: float _mips_reconnect_timer: Optional[asyncio.TimerHandle] - _mips_state_sub_map: dict[str, MipsState] + _mips_state_sub_map: dict[str, _MipsState] _mips_sub_pending_map: dict[str, int] _mips_sub_pending_timer: Optional[asyncio.TimerHandle] @@ -770,7 +763,7 @@ class MipsClient(ABC): def __sub_mips_state( self, key: str, handler: Callable[[str, bool], Coroutine] ) -> None: - state = MipsState(key=key, handler=handler) + state = _MipsState(key=key, handler=handler) self._mips_state_sub_map[key] = state self.log_debug(f'mips register mips state, {key}') @@ -788,7 +781,7 @@ class MipsClient(ABC): return self._mips_reconnect_interval -class MipsCloudClient(MipsClient): +class MipsCloudClient(_MipsClient): """MIoT Pub/Sub Cloud Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes @@ -1006,7 +999,7 @@ class MipsCloudClient(MipsClient): handler_ctx: Any = None ) -> None: if not self._msg_matcher.get(topic=topic): - sub_bc: MipsBroadcast = MipsBroadcast( + sub_bc: _MipsBroadcast = _MipsBroadcast( topic=topic, handler=handler, handler_ctx=handler_ctx) self._msg_matcher[topic] = sub_bc @@ -1034,7 +1027,7 @@ class MipsCloudClient(MipsClient): NOTICE thread safe, this function will be called at the **mips** thread """ # broadcast - bc_list: list[MipsBroadcast] = list( + bc_list: list[_MipsBroadcast] = list( self._msg_matcher.iter_match(topic)) if not bc_list: return @@ -1049,7 +1042,7 @@ class MipsCloudClient(MipsClient): item.handler, topic, payload_str, item.handler_ctx) -class MipsLocalClient(MipsClient): +class MipsLocalClient(_MipsClient): """MIoT Pub/Sub Local Client.""" # pylint: disable=unused-argument # pylint: disable=inconsistent-quotes @@ -1063,7 +1056,7 @@ class MipsLocalClient(MipsClient): _mips_seed_id: int _reply_topic: str _dev_list_change_topic: str - _request_map: dict[str, MipsRequest] + _request_map: dict[str, _MipsRequest] _msg_matcher: MIoTMatcher _get_prop_queue: dict[str, list] _get_prop_timer: asyncio.TimerHandle | None @@ -1398,7 +1391,7 @@ class MipsLocalClient(MipsClient): on_reply: Callable[[str, Any], None], on_reply_ctx: Any = None, timeout_ms: int = 10000 ) -> None: - req = MipsRequest( + req = _MipsRequest( mid=self.__gen_mips_id, on_reply=on_reply, on_reply_ctx=on_reply_ctx, @@ -1411,7 +1404,7 @@ class MipsLocalClient(MipsClient): f'mips local call api, {result}, {req.mid}, {pub_topic}, ' f'{payload}') - def on_request_timeout(req: MipsRequest): + def on_request_timeout(req: _MipsRequest): self.log_error( f'on mips request timeout, {req.mid}, {pub_topic}' f', {payload}') @@ -1429,7 +1422,7 @@ class MipsLocalClient(MipsClient): ) -> None: sub_topic: str = f'{self._did}/{topic}' if not self._msg_matcher.get(sub_topic): - sub_bc: MipsBroadcast = MipsBroadcast( + sub_bc: _MipsBroadcast = _MipsBroadcast( topic=sub_topic, handler=handler, handler_ctx=handler_ctx) self._msg_matcher[sub_topic] = sub_bc @@ -1465,13 +1458,13 @@ class MipsLocalClient(MipsClient): @final def _on_mips_message(self, topic: str, payload: bytes) -> None: - mips_msg: MipsMessage = MipsMessage.unpack(payload) + mips_msg: _MipsMessage = _MipsMessage.unpack(payload) # self.log_debug( # f"mips local client, on_message, {topic} -> {mips_msg}") # Reply if topic == self._reply_topic: self.log_debug(f'on request reply, {mips_msg}') - req: MipsRequest | None = self._request_map.pop( + req: _MipsRequest | None = self._request_map.pop( str(mips_msg.mid), None) if req: # Cancel timer @@ -1483,7 +1476,7 @@ class MipsLocalClient(MipsClient): req.on_reply_ctx) return # Broadcast - bc_list: list[MipsBroadcast] = list(self._msg_matcher.iter_match( + bc_list: list[_MipsBroadcast] = list(self._msg_matcher.iter_match( topic=topic)) if bc_list: self.log_debug(f'on broadcast, {topic}, {mips_msg}') @@ -1529,7 +1522,7 @@ class MipsLocalClient(MipsClient): wait_for_publish: bool = False, timeout_ms: int = 10000 ) -> bool: - mips_msg: bytes = MipsMessage.pack( + mips_msg: bytes = _MipsMessage.pack( mid=mid or self.__gen_mips_id, payload=payload, msg_from='local', ret_topic=ret_topic) return self._mips_publish_internal( From 23bf6817b9f0663cf1dd3f89d0148d6d80733b90 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Mon, 23 Dec 2024 16:43:34 +0800 Subject: [PATCH 4/6] simplify inheritance --- .../xiaomi_home/miot/miot_mips.py | 39 +++---------------- 1 file changed, 6 insertions(+), 33 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index e4eb093..98d19dc 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -305,9 +305,6 @@ class _MipsClient(ABC): @final def connect(self) -> None: """mips connect.""" - # TODO: make this more precise - # Mark as not closed, though also not connected yet - self._is_closed = False # Start mips thread self._internal_loop = asyncio.new_event_loop() self._mips_thread = threading.Thread(target=self.__mips_loop_thread) @@ -317,8 +314,6 @@ class _MipsClient(ABC): @final def close(self) -> None: - self._is_connected = False - self._internal_loop.call_soon_threadsafe(self.__mips_close) self._mips_thread.join() self._internal_loop.close() @@ -362,21 +357,18 @@ class _MipsClient(ABC): else: self._mqtt.disable_logger() - @final - async def mips_connect_async(self) -> None: + async def connect_async(self) -> None: """mips connect async.""" self.connect() await self._event_connect.wait() - @final - def mips_disconnect(self) -> None: + def disconnect(self) -> None: """mips disconnect.""" self._internal_loop.call_soon_threadsafe(self.__mips_disconnect) - @final - async def mips_disconnect_async(self) -> None: + async def disconnect_async(self) -> None: """mips disconnect async.""" - self.mips_disconnect() + self.disconnect() await self._event_disconnect.wait() @final @@ -797,18 +789,9 @@ class MipsCloudClient(_MipsClient): client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com', port=port, username=app_id, password=token, loop=loop) - @final - async def connect_async(self) -> None: - await self.mips_connect_async() - @final def disconnect(self) -> None: - self.mips_disconnect() - self._msg_matcher = MIoTMatcher() - - @final - async def disconnect_async(self) -> None: - await self.mips_disconnect_async() + super().disconnect() self._msg_matcher = MIoTMatcher() def update_access_token(self, access_token: str) -> bool: @@ -1102,19 +1085,9 @@ class MipsLocalClient(_MipsClient): if self._logger: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) - @final - async def connect_async(self) -> None: - await self.mips_connect_async() - @final def disconnect(self) -> None: - self.mips_disconnect() - self._request_map = {} - self._msg_matcher = MIoTMatcher() - - @final - async def disconnect_async(self) -> None: - await self.mips_disconnect_async() + super().disconnect() self._request_map = {} self._msg_matcher = MIoTMatcher() From d1c49bdf7b0b8d8956d47e3206d02c4594fb3661 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Mon, 23 Dec 2024 16:59:45 +0800 Subject: [PATCH 5/6] fix thread naming --- .../xiaomi_home/miot/miot_mips.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index 98d19dc..626b020 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -227,7 +227,7 @@ class _MipsClient(ABC): _event_connect: asyncio.Event _event_disconnect: asyncio.Event _internal_loop: asyncio.AbstractEventLoop - _mips_thread: threading.Thread + _mips_thread: threading.Thread | None = None _mips_reconnect_tag: bool _mips_reconnect_interval: float _mips_reconnect_timer: Optional[asyncio.TimerHandle] @@ -302,20 +302,23 @@ class _MipsClient(ABC): """ return self._mqtt and self._mqtt.is_connected() - @final - def connect(self) -> None: + def connect(self, thread_name: Optional[str] = None) -> None: """mips connect.""" # Start mips thread + if self._mips_thread: + return self._internal_loop = asyncio.new_event_loop() self._mips_thread = threading.Thread(target=self.__mips_loop_thread) self._mips_thread.daemon = True - self._mips_thread.name = self._client_id + self._mips_thread.name = \ + self._client_id if thread_name is None else thread_name self._mips_thread.start() @final def close(self) -> None: self._internal_loop.call_soon_threadsafe(self.__mips_close) - self._mips_thread.join() + if self._mips_thread: + self._mips_thread.join() self._internal_loop.close() self._logger = None @@ -1066,8 +1069,6 @@ class MipsLocalClient(_MipsClient): super().__init__( client_id=did, host=host, port=port, ca_file=ca_file, cert_file=cert_file, key_file=key_file, loop=loop) - # MIPS local thread name use group_id - self._mips_thread.name = self._group_id @property def group_id(self) -> str: @@ -1085,6 +1086,11 @@ class MipsLocalClient(_MipsClient): if self._logger: self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs) + @final + def connect(self, thread_name: Optional[str] = None) -> None: + # MIPS local thread name use group_id + super().connect(self._group_id) + @final def disconnect(self) -> None: super().disconnect() From 55d27409b736928c04eb0ca4bea905a5422ed1b7 Mon Sep 17 00:00:00 2001 From: Feng Wang Date: Mon, 23 Dec 2024 17:03:09 +0800 Subject: [PATCH 6/6] fix the deleted public data class --- custom_components/xiaomi_home/miot/miot_mips.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/custom_components/xiaomi_home/miot/miot_mips.py b/custom_components/xiaomi_home/miot/miot_mips.py index 626b020..d2bebde 100644 --- a/custom_components/xiaomi_home/miot/miot_mips.py +++ b/custom_components/xiaomi_home/miot/miot_mips.py @@ -196,6 +196,17 @@ class MIoTDeviceState(Enum): OFFLINE = auto() ONLINE = auto() +@dataclass +class MipsDeviceState: + """MIoT Pub/Sub device state.""" + did: str | None = None + """handler + str: did + MIoTDeviceState: online/offline/disable + Any: ctx + """ + handler: Callable[[str, MIoTDeviceState, Any], None] | None = None + handler_ctx: Any = None class _MipsClient(ABC): """MIoT Pub/Sub client."""