diff --git a/custom_components/xiaomi_home/miot/common.py b/custom_components/xiaomi_home/miot/common.py index 0b03f96..223dbd0 100644 --- a/custom_components/xiaomi_home/miot/common.py +++ b/custom_components/xiaomi_home/miot/common.py @@ -83,6 +83,9 @@ def randomize_int(value: int, ratio: float) -> int: """Randomize an integer value.""" return int(value * (1 - ratio + random.random()*2*ratio)) +def randomize_float(value: float, ratio: float) -> float: + """Randomize a float value.""" + return value * (1 - ratio + random.random()*2*ratio) class MIoTMatcher(MQTTMatcher): """MIoT Pub/Sub topic matcher.""" diff --git a/custom_components/xiaomi_home/miot/miot_ev.py b/custom_components/xiaomi_home/miot/miot_ev.py deleted file mode 100644 index be4e684..0000000 --- a/custom_components/xiaomi_home/miot/miot_ev.py +++ /dev/null @@ -1,324 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Copyright (C) 2024 Xiaomi Corporation. - -The ownership and intellectual property rights of Xiaomi Home Assistant -Integration and related Xiaomi cloud service API interface provided under this -license, including source code and object code (collectively, "Licensed Work"), -are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi -hereby grants you a personal, limited, non-exclusive, non-transferable, -non-sublicensable, and royalty-free license to reproduce, use, modify, and -distribute the Licensed Work only for your use of Home Assistant for -non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize -you to use the Licensed Work for any other purpose, including but not limited -to use Licensed Work to develop applications (APP), Web services, and other -forms of software. - -You may reproduce and distribute copies of the Licensed Work, with or without -modifications, whether in source or object form, provided that you must give -any other recipients of the Licensed Work a copy of this License and retain all -copyright and disclaimers. - -Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR -CONDITIONS OF ANY KIND, either express or implied, including, without -limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR -OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or -FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible -for any direct, indirect, special, incidental, or consequential damages or -losses arising from the use or inability to use the Licensed Work. - -Xiaomi reserves all rights not expressly granted to you in this License. -Except for the rights expressly granted by Xiaomi under this License, Xiaomi -does not authorize you in any form to use the trademarks, copyrights, or other -forms of intellectual property rights of Xiaomi and its affiliates, including, -without limitation, without obtaining other written permission from Xiaomi, you -shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that -may make the public associate with Xiaomi in any form to publicize or promote -the software or hardware devices that use the Licensed Work. - -Xiaomi has the right to immediately terminate all your authorization under this -License in the event: -1. You assert patent invalidation, litigation, or other claims against patents -or other intellectual property rights of Xiaomi or its affiliates; or, -2. You make, have made, manufacture, sell, or offer to sell products that knock -off Xiaomi or its affiliates' products. - -MIoT event loop. -""" -import selectors -import heapq -import time -import traceback -from typing import Callable, TypeVar -import logging -import threading - -# pylint: disable=relative-beyond-top-level -from .miot_error import MIoTEvError - -_LOGGER = logging.getLogger(__name__) - -TimeoutHandle = TypeVar('TimeoutHandle') - - -class MIoTFdHandler: - """File descriptor handler.""" - fd: int - read_handler: Callable[[any], None] - read_handler_ctx: any - write_handler: Callable[[any], None] - write_handler_ctx: any - - def __init__( - self, fd: int, - read_handler: Callable[[any], None] = None, - read_handler_ctx: any = None, - write_handler: Callable[[any], None] = None, - write_handler_ctx: any = None - ) -> None: - self.fd = fd - self.read_handler = read_handler - self.read_handler_ctx = read_handler_ctx - self.write_handler = write_handler - self.write_handler_ctx = write_handler_ctx - - -class MIoTTimeout: - """Timeout handler.""" - key: TimeoutHandle - target: int - handler: Callable[[any], None] - handler_ctx: any - - def __init__( - self, key: str = None, target: int = None, - handler: Callable[[any], None] = None, - handler_ctx: any = None - ) -> None: - self.key = key - self.target = target - self.handler = handler - self.handler_ctx = handler_ctx - - def __lt__(self, other): - return self.target < other.target - - -class MIoTEventLoop: - """MIoT event loop.""" - _poll_fd: selectors.DefaultSelector - - _fd_handlers: dict[str, MIoTFdHandler] - - _timer_heap: list[MIoTTimeout] - _timer_handlers: dict[str, MIoTTimeout] - _timer_handle_seed: int - - # Label if the current fd handler is freed inside a read handler to - # avoid invalid reading. - _fd_handler_freed_in_read_handler: bool - - def __init__(self) -> None: - self._poll_fd = selectors.DefaultSelector() - self._timer_heap = [] - self._timer_handlers = {} - self._timer_handle_seed = 1 - self._fd_handlers = {} - self._fd_handler_freed_in_read_handler = False - - def loop_forever(self) -> None: - """Run an event loop in current thread.""" - next_timeout: int - while True: - next_timeout = 0 - # Handle timer - now_ms: int = self.__get_monotonic_ms - while len(self._timer_heap) > 0: - timer: MIoTTimeout = self._timer_heap[0] - if timer is None: - break - if timer.target <= now_ms: - heapq.heappop(self._timer_heap) - del self._timer_handlers[timer.key] - if timer.handler: - timer.handler(timer.handler_ctx) - else: - next_timeout = timer.target-now_ms - break - # Are there any files to listen to - if next_timeout == 0 and self._fd_handlers: - next_timeout = None # None == infinite - # Wait for timers & fds - if next_timeout == 0: - # Neither timer nor fds exist, exit loop - break - # Handle fd event - events = self._poll_fd.select( - timeout=next_timeout/1000.0 if next_timeout else next_timeout) - for key, mask in events: - fd_handler: MIoTFdHandler = key.data - if fd_handler is None: - continue - self._fd_handler_freed_in_read_handler = False - fd_key = str(id(fd_handler.fd)) - if fd_key not in self._fd_handlers: - continue - if ( - mask & selectors.EVENT_READ > 0 - and fd_handler.read_handler - ): - fd_handler.read_handler(fd_handler.read_handler_ctx) - if ( - mask & selectors.EVENT_WRITE > 0 - and self._fd_handler_freed_in_read_handler is False - and fd_handler.write_handler - ): - fd_handler.write_handler(fd_handler.write_handler_ctx) - - def loop_stop(self) -> None: - """Stop the event loop.""" - if self._poll_fd: - self._poll_fd.close() - self._poll_fd = None - self._fd_handlers = {} - self._timer_heap = [] - self._timer_handlers = {} - - def set_timeout( - self, timeout_ms: int, handler: Callable[[any], None], - handler_ctx: any = None - ) -> TimeoutHandle: - """Set a timer.""" - if timeout_ms is None or handler is None: - raise MIoTEvError('invalid params') - new_timeout: MIoTTimeout = MIoTTimeout() - new_timeout.key = self.__get_next_timeout_handle - new_timeout.target = self.__get_monotonic_ms + timeout_ms - new_timeout.handler = handler - new_timeout.handler_ctx = handler_ctx - heapq.heappush(self._timer_heap, new_timeout) - self._timer_handlers[new_timeout.key] = new_timeout - return new_timeout.key - - def clear_timeout(self, timer_key: TimeoutHandle) -> None: - """Stop and remove the timer.""" - if timer_key is None: - return - timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None) - if timer: - self._timer_heap = list(self._timer_heap) - self._timer_heap.remove(timer) - heapq.heapify(self._timer_heap) - - def set_read_handler( - self, fd: int, handler: Callable[[any], None], handler_ctx: any = None - ) -> bool: - """Set a read handler for a file descriptor. - - Returns: - bool: True, success. False, failed. - """ - self.__set_handler( - fd, is_read=True, handler=handler, handler_ctx=handler_ctx) - - def set_write_handler( - self, fd: int, handler: Callable[[any], None], handler_ctx: any = None - ) -> bool: - """Set a write handler for a file descriptor. - - Returns: - bool: True, success. False, failed. - """ - self.__set_handler( - fd, is_read=False, handler=handler, handler_ctx=handler_ctx) - - def __set_handler( - self, fd, is_read: bool, handler: Callable[[any], None], - handler_ctx: any = None - ) -> bool: - """Set a handler.""" - if fd is None: - raise MIoTEvError('invalid params') - - if not self._poll_fd: - raise MIoTEvError('event loop not started') - - fd_key: str = str(id(fd)) - fd_handler = self._fd_handlers.get(fd_key, None) - - if fd_handler is None: - fd_handler = MIoTFdHandler(fd=fd) - fd_handler.fd = fd - self._fd_handlers[fd_key] = fd_handler - - read_handler_existed = fd_handler.read_handler is not None - write_handler_existed = fd_handler.write_handler is not None - if is_read is True: - fd_handler.read_handler = handler - fd_handler.read_handler_ctx = handler_ctx - else: - fd_handler.write_handler = handler - fd_handler.write_handler_ctx = handler_ctx - - if fd_handler.read_handler is None and fd_handler.write_handler is None: - # Remove from epoll and map - try: - self._poll_fd.unregister(fd) - except (KeyError, ValueError, OSError) as e: - del e - self._fd_handlers.pop(fd_key, None) - # May be inside a read handler, if not, this has no effect - self._fd_handler_freed_in_read_handler = True - elif read_handler_existed is False and write_handler_existed is False: - # Add to epoll - events = 0x0 - if fd_handler.read_handler: - events |= selectors.EVENT_READ - if fd_handler.write_handler: - events |= selectors.EVENT_WRITE - try: - self._poll_fd.register(fd, events=events, data=fd_handler) - except (KeyError, ValueError, OSError) as e: - _LOGGER.error( - '%s, register fd, error, %s, %s, %s, %s, %s', - threading.current_thread().name, - 'read' if is_read else 'write', - fd_key, handler, e, traceback.format_exc()) - self._fd_handlers.pop(fd_key, None) - return False - elif ( - read_handler_existed != (fd_handler.read_handler is not None) - or write_handler_existed != (fd_handler.write_handler is not None) - ): - # Modify epoll - events = 0x0 - if fd_handler.read_handler: - events |= selectors.EVENT_READ - if fd_handler.write_handler: - events |= selectors.EVENT_WRITE - try: - self._poll_fd.modify(fd, events=events, data=fd_handler) - except (KeyError, ValueError, OSError) as e: - _LOGGER.error( - '%s, modify fd, error, %s, %s, %s, %s, %s', - threading.current_thread().name, - 'read' if is_read else 'write', - fd_key, handler, e, traceback.format_exc()) - self._fd_handlers.pop(fd_key, None) - return False - - return True - - @property - def __get_next_timeout_handle(self) -> str: - # Get next timeout handle, that is not larger than the maximum - # value of UINT64 type. - self._timer_handle_seed += 1 - # uint64 max - self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF - return str(self._timer_handle_seed) - - @property - def __get_monotonic_ms(self) -> int: - """Get monotonic ms timestamp.""" - return int(time.monotonic()*1000) diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 6679328..290a10c 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -10,30 +10,30 @@ hereby grants you a personal, limited, non-exclusive, non-transferable, non-sublicensable, and royalty-free license to reproduce, use, modify, and distribute the Licensed Work only for your use of Home Assistant for non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize -you to use the Licensed Work for any other purpose, including but not limited +you to use the Licensed Work for Any other purpose, including but not limited to use Licensed Work to develop applications (APP), Web services, and other forms of software. You may reproduce and distribute copies of the Licensed Work, with or without modifications, whether in source or object form, provided that you must give -any other recipients of the Licensed Work a copy of this License and retain all +Any other recipients of the Licensed Work a copy of this License and retain all copyright and disclaimers. Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without -limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR +limitation, Any warranties, undertakes, or conditions of TITLE, NO ERROR OR OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or -FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible -for any direct, indirect, special, incidental, or consequential damages or +FITNESS FOR A PARTICULAR PURPOSE. In Any event, you are solely responsible +for Any direct, indirect, special, incidental, or consequential damages or losses arising from the use or inability to use the Licensed Work. Xiaomi reserves all rights not expressly granted to you in this License. Except for the rights expressly granted by Xiaomi under this License, Xiaomi -does not authorize you in any form to use the trademarks, copyrights, or other +does not authorize you in Any form to use the trademarks, copyrights, or other forms of intellectual property rights of Xiaomi and its affiliates, including, without limitation, without obtaining other written permission from Xiaomi, you shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that -may make the public associate with Xiaomi in any form to publicize or promote +may make the public associate with Xiaomi in Any form to publicize or promote the software or hardware devices that use the Licensed Work. Xiaomi has the right to immediately terminate all your authorization under this @@ -60,7 +60,7 @@ import secrets import socket import struct import threading -from typing import Callable, Optional, final +from typing import Callable, Optional, final, Coroutine, Any from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding from cryptography.hazmat.backends import default_backend @@ -68,11 +68,10 @@ from cryptography.hazmat.primitives import hashes # pylint: disable=relative-beyond-top-level from .miot_error import MIoTErrorCode -from .miot_ev import MIoTEventLoop, TimeoutHandle from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo from .miot_mdns import MipsService, MipsServiceState from .common import ( - randomize_int, load_yaml_file, gen_absolute_path, MIoTMatcher) + randomize_float, load_yaml_file, gen_absolute_path, MIoTMatcher) _LOGGER = logging.getLogger(__name__) @@ -98,13 +97,13 @@ class MIoTLanCmdType(Enum): class MIoTLanCmd: """MIoT lan command.""" type_: MIoTLanCmdType - data: any + data: Any @dataclass class MIoTLanCmdData: - handler: Callable[[dict, any], None] - handler_ctx: any + handler: Callable[[dict, Any], None] + handler_ctx: Any timeout_ms: int @@ -131,8 +130,8 @@ class MIoTLanUnregisterBroadcastData: @dataclass class MIoTLanRegisterBroadcastData: key: str - handler: Callable[[dict, any], None] - handler_ctx: any + handler: Callable[[dict, Any], None] + handler_ctx: Any @dataclass @@ -143,8 +142,8 @@ class MIoTLanUnsubDeviceState: @dataclass class MIoTLanSubDeviceState: key: str - handler: Callable[[str, dict, any], None] - handler_ctx: any + handler: Callable[[str, dict, Any], None] + handler_ctx: Any @dataclass @@ -156,9 +155,9 @@ class MIoTLanNetworkUpdateData: @dataclass class MIoTLanRequestData: msg_id: int - handler: Callable[[dict, any], None] - handler_ctx: any - timeout: TimeoutHandle + handler: Optional[Callable[[dict, Any], None]] + handler_ctx: Any + timeout: asyncio.TimerHandle class MIoTLanDeviceState(Enum): @@ -175,12 +174,12 @@ class MIoTLanDevice: OT_HEADER: int = 0x2131 OT_HEADER_LEN: int = 32 NETWORK_UNSTABLE_CNT_TH: int = 10 - NETWORK_UNSTABLE_TIME_TH: int = 120000 - NETWORK_UNSTABLE_RESUME_TH: int = 300000 - FAST_PING_INTERVAL: int = 5000 - CONSTRUCT_STATE_PENDING: int = 15000 - KA_INTERVAL_MIN = 10000 - KA_INTERVAL_MAX = 50000 + NETWORK_UNSTABLE_TIME_TH: float = 120 + NETWORK_UNSTABLE_RESUME_TH: float = 300 + FAST_PING_INTERVAL: float = 5 + CONSTRUCT_STATE_PENDING: float = 15 + KA_INTERVAL_MIN: float = 10 + KA_INTERVAL_MAX: float = 50 did: str token: bytes @@ -192,19 +191,19 @@ class MIoTLanDevice: sub_ts: int supported_wildcard_sub: bool - _manager: any + _manager: 'MIoTLan' _if_name: Optional[str] _sub_locked: bool _state: MIoTLanDeviceState _online: bool - _online_offline_history: list[dict[str, any]] - _online_offline_timer: Optional[TimeoutHandle] + _online_offline_history: list[dict[str, Any]] + _online_offline_timer: Optional[asyncio.TimerHandle] - _ka_timer: TimeoutHandle - _ka_internal: int + _ka_timer: Optional[asyncio.TimerHandle] + _ka_internal: float def __init__( - self, manager: any, did: str, token: str, ip: Optional[str] = None + self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None ) -> None: self._manager: MIoTLan = manager self.did = did @@ -225,12 +224,12 @@ class MIoTLanDevice: self._online_offline_history = [] self._online_offline_timer = None - def ka_init_handler(ctx: any) -> None: + def ka_init_handler() -> None: self._ka_internal = self.KA_INTERVAL_MIN self.__update_keep_alive(state=MIoTLanDeviceState.DEAD) - self._ka_timer = self._manager.mev.set_timeout( - randomize_int(self.CONSTRUCT_STATE_PENDING, 0.5), - ka_init_handler, None) + self._ka_timer = self._manager.internal_loop.call_later( + randomize_float(self.CONSTRUCT_STATE_PENDING, 0.5), + ka_init_handler,) _LOGGER.debug('miot lan device add, %s', self.did) def keep_alive(self, ip: str, if_name: str) -> None: @@ -342,11 +341,9 @@ class MIoTLanDevice: def on_delete(self) -> None: if self._ka_timer: - self._manager.mev.clear_timeout(self._ka_timer) + self._ka_timer.cancel() if self._online_offline_timer: - self._manager.mev.clear_timeout(self._online_offline_timer) - self._manager = None - self.cipher = None + self._online_offline_timer.cancel() _LOGGER.debug('miot lan device delete, %s', self.did) def update_info(self, info: dict) -> None: @@ -379,7 +376,7 @@ class MIoTLanDevice: 'online': self._online, 'push_available': self.subscribed}) _LOGGER.info('subscribe success, %s, %s', self._if_name, self.did) - def __unsubscribe_handler(self, msg: dict, ctx: any) -> None: + def __unsubscribe_handler(self, msg: dict, ctx: Any) -> None: if ( 'result' not in msg or 'code' not in msg['result'] @@ -395,14 +392,14 @@ class MIoTLanDevice: if self._state != MIoTLanDeviceState.FRESH: _LOGGER.debug('device status, %s, %s', self.did, self._state) if self._ka_timer: - self._manager.mev.clear_timeout(self._ka_timer) + self._ka_timer.cancel() self._ka_timer = None match state: case MIoTLanDeviceState.FRESH: if last_state == MIoTLanDeviceState.DEAD: self._ka_internal = self.KA_INTERVAL_MIN self.__change_online(True) - self._ka_timer = self._manager.mev.set_timeout( + self._ka_timer = self._manager.internal_loop.call_later( self.__get_next_ka_timeout(), self.__update_keep_alive, MIoTLanDeviceState.PING1) case ( @@ -410,11 +407,18 @@ class MIoTLanDevice: | MIoTLanDeviceState.PING2 | MIoTLanDeviceState.PING3 ): - self._manager.ping(if_name=self._if_name, target_ip=self.ip) - # Fast ping - self._ka_timer = self._manager.mev.set_timeout( + # Set the timer first to avoid Any early returns + self._ka_timer = self._manager.internal_loop.call_later( self.FAST_PING_INTERVAL, self.__update_keep_alive, MIoTLanDeviceState(state.value+1)) + # Fast ping + if self._if_name is None: + _LOGGER.error('if_name is Not set for device, %s', self.did) + return + if self.ip is None: + _LOGGER.error('ip is Not set for device, %s', self.did) + return + self._manager.ping(if_name=self._if_name, target_ip=self.ip) case MIoTLanDeviceState.DEAD: if last_state == MIoTLanDeviceState.PING3: self._ka_internal = self.KA_INTERVAL_MIN @@ -422,9 +426,9 @@ class MIoTLanDevice: case _: _LOGGER.error('invalid state, %s', state) - def __get_next_ka_timeout(self) -> int: + def __get_next_ka_timeout(self) -> float: self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX) - return randomize_int(self._ka_internal, 0.1) + return randomize_float(self._ka_internal, 0.1) def __change_online(self, online: bool) -> None: _LOGGER.info('change online, %s, %s', self.did, online) @@ -433,7 +437,7 @@ class MIoTLanDevice: if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH: self._online_offline_history.pop(0) if self._online_offline_timer: - self._manager.mev.clear_timeout(self._online_offline_timer) + self._online_offline_timer.cancel() if not online: self.online = False else: @@ -446,11 +450,11 @@ class MIoTLanDevice: self.online = True else: _LOGGER.info('unstable device detected, %s', self.did) - self._online_offline_timer = self._manager.mev.set_timeout( + self._online_offline_timer = self._manager.internal_loop.call_later( self.NETWORK_UNSTABLE_RESUME_TH, - self.__online_resume_handler, None) + self.__online_resume_handler) - def __online_resume_handler(self, ctx: any) -> None: + def __online_resume_handler(self) -> None: _LOGGER.info('unstable resume threshold past, %s', self.did) self.online = True @@ -470,8 +474,8 @@ class MIoTLan: OT_MSG_LEN: int = 1400 OT_SUPPORT_WILDCARD_SUB: int = 0xFE - OT_PROBE_INTERVAL_MIN: int = 5000 - OT_PROBE_INTERVAL_MAX: int = 45000 + OT_PROBE_INTERVAL_MIN: float = 5 + OT_PROBE_INTERVAL_MAX: float = 45 PROFILE_MODELS_FILE: str = 'lan/profile_models.yaml' @@ -486,23 +490,21 @@ class MIoTLan: _write_buffer: bytearray _read_buffer: bytearray - _mev: MIoTEventLoop + _internal_loop: asyncio.AbstractEventLoop _thread: threading.Thread - _queue: queue.Queue - _cmd_event_fd: os.eventfd _available_net_ifs: set[str] _broadcast_socks: dict[str, socket.socket] _local_port: Optional[int] - _scan_timer: TimeoutHandle - _last_scan_interval: Optional[int] + _scan_timer: Optional[asyncio.TimerHandle] + _last_scan_interval: Optional[float] _msg_id_counter: int _pending_requests: dict[int, MIoTLanRequestData] _device_msg_matcher: MIoTMatcher _device_state_sub_map: dict[str, MIoTLanSubDeviceState] - _reply_msg_buffer: dict[str, TimeoutHandle] + _reply_msg_buffer: dict[str, asyncio.TimerHandle] - _lan_state_sub_map: dict[str, Callable[[bool], asyncio.Future]] + _lan_state_sub_map: dict[str, Callable[[bool], Coroutine]] _lan_ctrl_vote_map: dict[str, bool] _profile_models: dict[str, dict] @@ -532,7 +534,7 @@ class MIoTLan: key='miot_lan', group_id='*', handler=self.__on_mips_service_change) self._enable_subscribe = enable_subscribe - self._virtual_did = virtual_did or str(secrets.randbits(64)) + self._virtual_did = str(virtual_did) if (virtual_did is not None) else str(secrets.randbits(64)) # Init socket probe message probe_bytes = bytearray(self.OT_PROBE_LEN) probe_bytes[:20] = ( @@ -574,8 +576,8 @@ class MIoTLan: return self._virtual_did @property - def mev(self) -> MIoTEventLoop: - return self._mev + def internal_loop(self) -> asyncio.AbstractEventLoop: + return self._internal_loop @property def init_done(self) -> bool: @@ -609,12 +611,10 @@ class MIoTLan: except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('load profile models error, %s', err) self._profile_models = {} - self._mev = MIoTEventLoop() self._queue = queue.Queue() self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) - self._mev.set_read_handler( - self._cmd_event_fd, self.__cmd_read_handler, None) - self._thread = threading.Thread(target=self.__lan_thread_handler) + # All tasks meant for the internal loop should happen in this thread + self._thread = threading.Thread(target=self.__internal_loop_thread) self._thread.name = 'miot_lan' self._thread.daemon = True self._thread.start() @@ -623,6 +623,19 @@ class MIoTLan: self._main_loop.create_task(handler(True)) _LOGGER.info( 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) + + def __internal_loop_thread(self) -> None: + _LOGGER.info('miot lan thread start') + self._internal_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._internal_loop) + self.__init_socket() + self._internal_loop.add_reader( + self._cmd_event_fd, self.__cmd_read_handler) + self._scan_timer = self._internal_loop.call_later( + int(3*random.random()), self.__scan_devices) + self._internal_loop.run_forever() + self._internal_loop.close() + _LOGGER.info('miot lan thread exit') async def deinit_async(self) -> None: if not self._init_done: @@ -687,7 +700,7 @@ class MIoTLan: if not self._init_done: self._enable_subscribe = enable_subscribe return - return self.__lan_send_cmd( + self.__lan_send_cmd( cmd=MIoTLanCmdType.OPTIONS_UPDATE, data={ 'enable_subscribe': enable_subscribe, }) @@ -705,7 +718,7 @@ class MIoTLan: data=devices) def sub_lan_state( - self, key: str, handler: Callable[[bool], asyncio.Future] + self, key: str, handler: Callable[[bool], Coroutine] ) -> None: self._lan_state_sub_map[key] = handler @@ -714,8 +727,8 @@ class MIoTLan: @final def sub_device_state( - self, key: str, handler: Callable[[str, dict, any], None], - handler_ctx: any = None + self, key: str, handler: Callable[[str, dict, Any], None], + handler_ctx: Any = None ) -> bool: return self.__lan_send_cmd( cmd=MIoTLanCmdType.SUB_DEVICE_STATE, @@ -730,8 +743,8 @@ class MIoTLan: @final def sub_prop( - self, did: str, handler: Callable[[dict, any], None], - siid: int = None, piid: int = None, handler_ctx: any = None + self, did: str, handler: Callable[[dict, Any], None], + siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: if not self._enable_subscribe: return False @@ -744,7 +757,7 @@ class MIoTLan: key=key, handler=handler, handler_ctx=handler_ctx)) @final - def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: + def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool: if not self._enable_subscribe: return False key = ( @@ -756,8 +769,8 @@ class MIoTLan: @final def sub_event( - self, did: str, handler: Callable[[dict, any], None], - siid: int = None, eiid: int = None, handler_ctx: any = None + self, did: str, handler: Callable[[dict, Any], None], + siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: if not self._enable_subscribe: return False @@ -770,7 +783,7 @@ class MIoTLan: key=key, handler=handler, handler_ctx=handler_ctx)) @final - def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: + def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool: if not self._enable_subscribe: return False key = ( @@ -783,7 +796,7 @@ class MIoTLan: @final async def get_prop_async( self, did: str, siid: int, piid: int, timeout_ms: int = 10000 - ) -> any: + ) -> Any: result_obj = await self.__call_api_async( did=did, msg={ 'method': 'get_properties', @@ -801,7 +814,7 @@ class MIoTLan: @final async def set_prop_async( - self, did: str, siid: int, piid: int, value: any, + self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000 ) -> dict: result_obj = await self.__call_api_async( @@ -868,7 +881,7 @@ class MIoTLan: fut.set_result({}) return await fut - def ping(self, if_name: str, target_ip: str) -> None: + def ping(self, if_name: str | None, target_ip: str) -> None: if not target_ip: return self.__sendto( @@ -878,13 +891,13 @@ class MIoTLan: def send2device( self, did: str, msg: dict, - handler: Optional[Callable[[dict, any], None]] = None, - handler_ctx: any = None, + handler: Optional[Callable[[dict, Any], None]] = None, + handler_ctx: Any = None, timeout_ms: Optional[int] = None ) -> None: if timeout_ms and not handler: raise ValueError('handler is required when timeout_ms is set') - device: MIoTLanDevice = self._lan_devices.get(did) + device: MIoTLanDevice | None = self._lan_devices.get(did) if not device: raise ValueError('invalid device') if not device.cipher: @@ -900,7 +913,7 @@ class MIoTLan: did=did, offset=int(time.time())-device.offset) - return self.make_request( + return self.__make_request( msg_id=in_msg['id'], msg=self._write_buffer[0: msg_len], if_name=device.if_name, @@ -909,33 +922,33 @@ class MIoTLan: handler_ctx=handler_ctx, timeout_ms=timeout_ms) - def make_request( + def __make_request( self, msg_id: int, msg: bytearray, if_name: str, ip: str, - handler: Callable[[dict, any], None], - handler_ctx: any = None, + handler: Optional[Callable[[dict, Any], None]], + handler_ctx: Any = None, timeout_ms: Optional[int] = None ) -> None: def request_timeout_handler(req_data: MIoTLanRequestData): self._pending_requests.pop(req_data.msg_id, None) - if req_data: + if req_data and req_data.handler: req_data.handler({ 'code': MIoTErrorCode.CODE_TIMEOUT.value, 'error': 'timeout'}, req_data.handler_ctx) - timer: Optional[TimeoutHandle] = None + timer: Optional[asyncio.TimerHandle] = None request_data = MIoTLanRequestData( msg_id=msg_id, handler=handler, handler_ctx=handler_ctx, timeout=timer) if timeout_ms: - timer = self._mev.set_timeout( - timeout_ms, request_timeout_handler, request_data) + timer = self._internal_loop.call_later( + timeout_ms/1000, request_timeout_handler, request_data) request_data.timeout = timer self._pending_requests[msg_id] = request_data self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT) @@ -954,7 +967,7 @@ class MIoTLan: self._msg_id_counter = 1 return self._msg_id_counter - def __lan_send_cmd(self, cmd: MIoTLanCmd, data: any) -> bool: + def __lan_send_cmd(self, cmd: MIoTLanCmdType, data: Any) -> bool: try: self._queue.put(MIoTLanCmd(type_=cmd, data=data)) os.eventfd_write(self._cmd_event_fd, 1) @@ -986,16 +999,7 @@ class MIoTLan: 'error': 'send cmd error'}) return await fut - def __lan_thread_handler(self) -> None: - _LOGGER.info('miot lan thread start') - self.__init_socket() - # Create scan devices timer - self._scan_timer = self._mev.set_timeout( - int(3000*random.random()), self.__scan_devices, None) - self._mev.loop_forever() - _LOGGER.info('miot lan thread exit') - - def __cmd_read_handler(self, ctx: any) -> None: + def __cmd_read_handler(self) -> None: fd_value = os.eventfd_read(self._cmd_event_fd) if fd_value == 0: return @@ -1109,20 +1113,21 @@ class MIoTLan: elif mips_cmd.type_ == MIoTLanCmdType.DEINIT: # stop the thread if self._scan_timer: - self._mev.clear_timeout(self._scan_timer) + self._scan_timer.cancel() self._scan_timer = None for device in self._lan_devices.values(): device.on_delete() self._lan_devices.clear() for req_data in self._pending_requests.values(): - self._mev.clear_timeout(req_data.timeout) + req_data.timeout.cancel() self._pending_requests.clear() for timer in self._reply_msg_buffer.values(): - self._mev.clear_timeout(timer) + timer.cancel() self._reply_msg_buffer.clear() self._device_msg_matcher = MIoTMatcher() self.__deinit_socket() - self._mev.loop_stop() + # DO NOT force a event loop to stop. + # It will stop when you release all handles properly. def __init_socket(self) -> None: self.__deinit_socket() @@ -1145,7 +1150,7 @@ class MIoTLan: sock.setsockopt( socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode()) sock.bind(('', self._local_port or 0)) - self._mev.set_read_handler( + self._internal_loop.add_reader( sock.fileno(), self.__socket_read_handler, (if_name, sock)) self._broadcast_socks[if_name] = sock self._local_port = self._local_port or sock.getsockname()[1] @@ -1163,7 +1168,7 @@ class MIoTLan: sock = self._broadcast_socks.pop(if_name, None) if not sock: return - self._mev.set_read_handler(sock.fileno(), None, None) + self._internal_loop.remove_reader(sock.fileno()) sock.close() _LOGGER.info('destroyed socket, %s', if_name) @@ -1190,7 +1195,7 @@ class MIoTLan: return # Keep alive message did: str = str(struct.unpack('>Q', data[4:12])[0]) - device: MIoTLanDevice = self._lan_devices.get(did) + device: MIoTLanDevice | None = self._lan_devices.get(did) if not device: return timestamp: int = struct.unpack('>I', data[12:16])[0] @@ -1230,11 +1235,12 @@ class MIoTLan: _LOGGER.warning('invalid message, no id, %s, %s', did, msg) return # Reply - req: MIoTLanRequestData = self._pending_requests.pop(msg['id'], None) + req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None) if req: - self._mev.clear_timeout(req.timeout) - self._main_loop.call_soon_threadsafe( - req.handler, msg, req.handler_ctx) + req.timeout.cancel() + if req.handler is not None: + self._main_loop.call_soon_threadsafe( + req.handler, msg, req.handler_ctx) return # Handle up link message if 'method' not in msg or 'params' not in msg: @@ -1281,15 +1287,16 @@ class MIoTLan: filter_id = f'{did}.{msg_id}' if filter_id in self._reply_msg_buffer: return True - self._reply_msg_buffer[filter_id] = self._mev.set_timeout( - 5000, + self._reply_msg_buffer[filter_id] = self._internal_loop.call_later( + 5, lambda filter_id: self._reply_msg_buffer.pop(filter_id, None), filter_id) + return False def __sendto( - self, if_name: str, data: bytes, address: str, port: int + self, if_name: str | None, data: bytes, address: str, port: int ) -> None: - if address == '255.255.255.255': + if if_name is None: # Broadcast for if_n, sock in self._broadcast_socks.items(): _LOGGER.debug('send broadcast, %s', if_n) @@ -1302,17 +1309,21 @@ class MIoTLan: return sock.sendto(data, socket.MSG_DONTWAIT, (address, port)) - def __scan_devices(self, ctx: any) -> None: + def __scan_devices(self) -> None: if self._scan_timer: - self._mev.clear_timeout(self._scan_timer) - # Scan devices - self.ping(if_name=None, target_ip='255.255.255.255') + self._scan_timer.cancel() + # Ignore any exceptions to avoid blocking the loop + try: + # Scan devices + self.ping(if_name=None, target_ip='255.255.255.255') + except: + pass scan_time = self.__get_next_scan_time() - self._scan_timer = self._mev.set_timeout( - scan_time, self.__scan_devices, None) + self._scan_timer = self._internal_loop.call_later( + scan_time, self.__scan_devices) _LOGGER.debug('next scan time: %sms', scan_time) - def __get_next_scan_time(self) -> int: + def __get_next_scan_time(self) -> float: if not self._last_scan_interval: self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN self._last_scan_interval = min( diff --git a/custom_components/xiaomi_home/miot/miot_mdns.py b/custom_components/xiaomi_home/miot/miot_mdns.py index 4f13649..7d45193 100644 --- a/custom_components/xiaomi_home/miot/miot_mdns.py +++ b/custom_components/xiaomi_home/miot/miot_mdns.py @@ -50,7 +50,7 @@ import base64 import binascii import copy from enum import Enum -from typing import Callable, Optional +from typing import Callable, Optional, Coroutine import logging from zeroconf import ( @@ -151,7 +151,7 @@ class MipsService: _services: dict[str, dict] # key = (key, group_id) _sub_list: dict[(str, str), Callable[[ - str, MipsServiceState, dict], asyncio.Future]] + str, MipsServiceState, dict], Coroutine]] def __init__( self, aiozc: AsyncZeroconf, @@ -207,7 +207,7 @@ class MipsService: def sub_service_change( self, key: str, group_id: str, - handler: Callable[[str, MipsServiceState, dict], asyncio.Future] + handler: Callable[[str, MipsServiceState, dict], Coroutine] ) -> None: if key is None or group_id is None or handler is None: raise MipsServiceError('invalid params') diff --git a/custom_components/xiaomi_home/miot/miot_network.py b/custom_components/xiaomi_home/miot/miot_network.py index a4606eb..70c3dc4 100644 --- a/custom_components/xiaomi_home/miot/miot_network.py +++ b/custom_components/xiaomi_home/miot/miot_network.py @@ -52,7 +52,7 @@ import socket from dataclasses import dataclass from enum import Enum, auto import subprocess -from typing import Callable, Optional +from typing import Callable, Optional, Coroutine import psutil import ipaddress @@ -97,7 +97,7 @@ class MIoTNetwork: _sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]] _sub_list_network_info: dict[str, Callable[[ - InterfaceStatus, NetworkInfo], asyncio.Future]] + InterfaceStatus, NetworkInfo], Coroutine]] _ping_address_priority: int @@ -155,7 +155,7 @@ class MIoTNetwork: def sub_network_info( self, key: str, - handler: Callable[[InterfaceStatus, NetworkInfo], asyncio.Future] + handler: Callable[[InterfaceStatus, NetworkInfo], Coroutine] ) -> None: self._sub_list_network_info[key] = handler diff --git a/test/test_ev.py b/test/test_ev.py deleted file mode 100644 index 6353fe8..0000000 --- a/test/test_ev.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- -"""Unit test for miot_ev.py.""" -import os -import pytest - -# pylint: disable=import-outside-toplevel, disable=unused-argument - - -@pytest.mark.github -def test_mev_timer_and_fd(): - from miot.miot_ev import MIoTEventLoop, TimeoutHandle - - mev = MIoTEventLoop() - assert mev - event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK) - assert event_fd - timer4: TimeoutHandle = None - - def event_handler(event_fd): - value: int = os.eventfd_read(event_fd) - if value == 1: - mev.clear_timeout(timer4) - print('cancel timer4') - elif value == 2: - print('event write twice in a row') - elif value == 3: - mev.set_read_handler(event_fd, None, None) - os.close(event_fd) - event_fd = None - print('close event fd') - - def timer1_handler(event_fd): - os.eventfd_write(event_fd, 1) - - def timer2_handler(event_fd): - os.eventfd_write(event_fd, 1) - os.eventfd_write(event_fd, 1) - - def timer3_handler(event_fd): - os.eventfd_write(event_fd, 3) - - def timer4_handler(event_fd): - raise ValueError('unreachable code') - - mev.set_read_handler( - event_fd, event_handler, event_fd) - - mev.set_timeout(500, timer1_handler, event_fd) - mev.set_timeout(1000, timer2_handler, event_fd) - mev.set_timeout(1500, timer3_handler, event_fd) - timer4 = mev.set_timeout(2000, timer4_handler, event_fd) - - mev.loop_forever() - # Loop will exit when there are no timers or fd handlers. - mev.loop_stop()