diff --git a/custom_components/xiaomi_home/miot/miot_client.py b/custom_components/xiaomi_home/miot/miot_client.py index 7423e20..f184ec5 100644 --- a/custom_components/xiaomi_home/miot/miot_client.py +++ b/custom_components/xiaomi_home/miot/miot_client.py @@ -1022,7 +1022,7 @@ class MIoTClient: handler=self.__on_lan_device_state_changed) for did, info in ( await self._miot_lan.get_dev_list_async()).items(): - self.__on_lan_device_state_changed( + await self.__on_lan_device_state_changed( did=did, state=info, ctx=None) _LOGGER.info('lan device list, %s', self._device_list_lan) self._miot_lan.update_devices(devices={ diff --git a/custom_components/xiaomi_home/miot/miot_lan.py b/custom_components/xiaomi_home/miot/miot_lan.py index 290a10c..ff32859 100644 --- a/custom_components/xiaomi_home/miot/miot_lan.py +++ b/custom_components/xiaomi_home/miot/miot_lan.py @@ -53,8 +53,6 @@ import asyncio from dataclasses import dataclass from enum import Enum, auto import logging -import os -import queue import random import secrets import socket @@ -77,29 +75,6 @@ from .common import ( _LOGGER = logging.getLogger(__name__) -class MIoTLanCmdType(Enum): - """MIoT lan command.""" - DEINIT = 0 - CALL_API = auto() - SUB_DEVICE_STATE = auto() - UNSUB_DEVICE_STATE = auto() - REG_BROADCAST = auto() - UNREG_BROADCAST = auto() - GET_DEV_LIST = auto() - DEVICE_UPDATE = auto() - DEVICE_DELETE = auto() - NET_INFO_UPDATE = auto() - NET_IFS_UPDATE = auto() - OPTIONS_UPDATE = auto() - - -@dataclass -class MIoTLanCmd: - """MIoT lan command.""" - type_: MIoTLanCmdType - data: Any - - @dataclass class MIoTLanCmdData: handler: Callable[[dict, Any], None] @@ -142,7 +117,7 @@ class MIoTLanUnsubDeviceState: @dataclass class MIoTLanSubDeviceState: key: str - handler: Callable[[str, dict, Any], None] + handler: Callable[[str, dict, Any], Coroutine] handler_ctx: Any @@ -157,7 +132,7 @@ class MIoTLanRequestData: msg_id: int handler: Optional[Callable[[dict, Any], None]] handler_ctx: Any - timeout: asyncio.TimerHandle + timeout: Optional[asyncio.TimerHandle] class MIoTLanDeviceState(Enum): @@ -202,6 +177,8 @@ class MIoTLanDevice: _ka_timer: Optional[asyncio.TimerHandle] _ka_internal: float +# All functions SHOULD be called from the internal loop + def __init__( self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None ) -> None: @@ -511,6 +488,8 @@ class MIoTLan: _init_done: bool +# The following should be called from the main loop + def __init__( self, net_ifs: list[str], @@ -528,7 +507,7 @@ class MIoTLan: self._net_ifs = set(net_ifs) self._network = network self._network.sub_network_info( - key='miot_lan', handler=self.__on_network_info_change) + key='miot_lan', handler=self.__on_network_info_change_external_async) self._mips_service = mips_service self._mips_service.sub_service_change( key='miot_lan', group_id='*', @@ -611,8 +590,6 @@ class MIoTLan: except Exception as err: # pylint: disable=broad-exception-caught _LOGGER.error('load profile models error, %s', err) self._profile_models = {} - self._queue = queue.Queue() - self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) # All tasks meant for the internal loop should happen in this thread self._thread = threading.Thread(target=self.__internal_loop_thread) self._thread.name = 'miot_lan' @@ -629,8 +606,6 @@ class MIoTLan: self._internal_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._internal_loop) self.__init_socket() - self._internal_loop.add_reader( - self._cmd_event_fd, self.__cmd_read_handler) self._scan_timer = self._internal_loop.call_later( int(3*random.random()), self.__scan_devices) self._internal_loop.run_forever() @@ -642,7 +617,7 @@ class MIoTLan: _LOGGER.info('miot lan not init') return self._init_done = False - self.__lan_send_cmd(MIoTLanCmdType.DEINIT, None) + self._internal_loop.call_soon_threadsafe(self.__deinit) self._thread.join() self._profile_models = {} @@ -683,9 +658,9 @@ class MIoTLan: self._net_ifs = set(net_ifs) await self.init_async() return - self.__lan_send_cmd( - cmd=MIoTLanCmdType.NET_IFS_UPDATE, - data=net_ifs) + self._internal_loop.call_soon_threadsafe( + self.__update_net_ifs, + net_ifs) async def vote_for_lan_ctrl_async(self, key: str, vote: bool) -> None: _LOGGER.info('vote for lan ctrl, %s, %s', key, vote) @@ -700,22 +675,21 @@ class MIoTLan: if not self._init_done: self._enable_subscribe = enable_subscribe return - self.__lan_send_cmd( - cmd=MIoTLanCmdType.OPTIONS_UPDATE, - data={ - 'enable_subscribe': enable_subscribe, }) + self._internal_loop.call_soon_threadsafe( + self.__update_subscribe_option, + {'enable_subscribe': enable_subscribe}) def update_devices(self, devices: dict[str, dict]) -> bool: _LOGGER.info('update devices, %s', devices) - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.DEVICE_UPDATE, - data=devices) + self._internal_loop.call_soon_threadsafe( + self.__update_devices, devices) + return True def delete_devices(self, devices: list[str]) -> bool: _LOGGER.info('delete devices, %s', devices) - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.DEVICE_DELETE, - data=devices) + self._internal_loop.call_soon_threadsafe( + self.__delete_devices, devices) + return True def sub_lan_state( self, key: str, handler: Callable[[bool], Coroutine] @@ -727,19 +701,20 @@ class MIoTLan: @final def sub_device_state( - self, key: str, handler: Callable[[str, dict, Any], None], + self, key: str, handler: Callable[[str, dict, Any], Coroutine], handler_ctx: Any = None ) -> bool: - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.SUB_DEVICE_STATE, - data=MIoTLanSubDeviceState( + self._internal_loop.call_soon_threadsafe( + self.__sub_device_state, + MIoTLanSubDeviceState( key=key, handler=handler, handler_ctx=handler_ctx)) + return True @final def unsub_device_state(self, key: str) -> bool: - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.UNSUB_DEVICE_STATE, - data=MIoTLanUnsubDeviceState(key=key)) + self._internal_loop.call_soon_threadsafe( + self.__unsub_device_state, MIoTLanUnsubDeviceState(key=key)) + return True @final def sub_prop( @@ -751,10 +726,11 @@ class MIoTLan: key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.REG_BROADCAST, - data=MIoTLanRegisterBroadcastData( + self._internal_loop.call_soon_threadsafe( + self.__sub_broadcast, + MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) + return True @final def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool: @@ -763,9 +739,10 @@ class MIoTLan: key = ( f'{did}/p/' f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.UNREG_BROADCAST, - data=MIoTLanUnregisterBroadcastData(key=key)) + self._internal_loop.call_soon_threadsafe( + self.__unsub_broadcast, + MIoTLanUnregisterBroadcastData(key=key)) + return True @final def sub_event( @@ -777,10 +754,11 @@ class MIoTLan: key = ( f'{did}/e/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.REG_BROADCAST, - data=MIoTLanRegisterBroadcastData( + self._internal_loop.call_soon_threadsafe( + self.__sub_broadcast, + MIoTLanRegisterBroadcastData( key=key, handler=handler, handler_ctx=handler_ctx)) + return True @final def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool: @@ -789,9 +767,10 @@ class MIoTLan: key = ( f'{did}/e/' f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}') - return self.__lan_send_cmd( - cmd=MIoTLanCmdType.UNREG_BROADCAST, - data=MIoTLanUnregisterBroadcastData(key=key)) + self._internal_loop.call_soon_threadsafe( + self.__unsub_broadcast, + MIoTLanUnregisterBroadcastData(key=key)) + return True @final async def get_prop_async( @@ -870,17 +849,67 @@ class MIoTLan: fut.set_result, msg) fut: asyncio.Future = self._main_loop.create_future() - if self.__lan_send_cmd( - MIoTLanCmdType.GET_DEV_LIST, + self._internal_loop.call_soon_threadsafe( + self.__get_dev_list, MIoTLanGetDevListData( handler=get_device_list_handler, handler_ctx=fut, - timeout_ms=timeout_ms)): - return await fut - _LOGGER.error('get_dev_list_async error, send cmd failed') - fut.set_result({}) + timeout_ms=timeout_ms)) return await fut + async def __call_api_async( + self, did: str, msg: dict, timeout_ms: int = 10000 + ) -> dict: + def call_api_handler(msg: dict, fut: asyncio.Future): + self._main_loop.call_soon_threadsafe( + fut.set_result, msg) + + fut: asyncio.Future = self._main_loop.create_future() + self._internal_loop.call_soon_threadsafe( + self.__call_api, did, msg, call_api_handler, fut, timeout_ms) + return await fut + + async def __on_network_info_change_external_async( + self, + status: InterfaceStatus, + info: NetworkInfo + ) -> None: + _LOGGER.info( + 'on network info change, status: %s, info: %s', status, info) + available_net_ifs = set() + for if_name in list(self._network.network_info.keys()): + available_net_ifs.add(if_name) + if len(available_net_ifs) == 0: + await self.deinit_async() + self._available_net_ifs = available_net_ifs + return + if self._net_ifs.isdisjoint(available_net_ifs): + _LOGGER.info('no valid net_ifs') + await self.deinit_async() + self._available_net_ifs = available_net_ifs + return + if not self._init_done: + self._available_net_ifs = available_net_ifs + await self.init_async() + return + self._internal_loop.call_soon_threadsafe( + self.__on_network_info_chnage, + MIoTLanNetworkUpdateData(status=status, if_name=info.name)) + + async def __on_mips_service_change( + self, group_id: str, state: MipsServiceState, data: dict + ) -> None: + _LOGGER.info( + 'on mips service change, %s, %s, %s', group_id, state, data) + if len(self._mips_service.get_services()) > 0: + _LOGGER.info('find central service, deinit miot lan') + await self.deinit_async() + else: + _LOGGER.info('no central service, init miot lan') + await self.init_async() + +# The folowing methods SHOULD ONLY be called in the internal loop + def ping(self, if_name: str | None, target_ip: str) -> None: if not target_ip: return @@ -956,8 +985,8 @@ class MIoTLan: def broadcast_device_state(self, did: str, state: dict) -> None: for handler in self._device_state_sub_map.values(): self._main_loop.call_soon_threadsafe( - self._main_loop.create_task, - handler.handler(did, state, handler.handler_ctx)) + lambda: self._main_loop.create_task( + handler.handler(did, state, handler.handler_ctx))) def __gen_msg_id(self) -> int: if not self._msg_id_counter: @@ -967,167 +996,129 @@ class MIoTLan: self._msg_id_counter = 1 return self._msg_id_counter - def __lan_send_cmd(self, cmd: MIoTLanCmdType, data: Any) -> bool: + def __call_api(self, did: str, msg: dict, handler: Callable, handler_ctx: Any, timeout_ms: int = 10000) -> None: try: - self._queue.put(MIoTLanCmd(type_=cmd, data=data)) - os.eventfd_write(self._cmd_event_fd, 1) - return True + self.send2device( + did=did, + msg={'from': 'ha.xiaomi_home', **msg}, + handler=handler, + handler_ctx=handler_ctx, + timeout_ms=timeout_ms) except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error('send cmd error, %s, %s', cmd, err) - return False + _LOGGER.error('send2device error, %s', err) + handler({ + 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, + 'error': str(err)}, + handler_ctx) - async def __call_api_async( - self, did: str, msg: dict, timeout_ms: int = 10000 - ) -> dict: - def call_api_handler(msg: dict, fut: asyncio.Future): - self._main_loop.call_soon_threadsafe( - fut.set_result, msg) + def __sub_device_state(self, data: MIoTLanSubDeviceState) -> None: + self._device_state_sub_map[data.key] = data - fut: asyncio.Future = self._main_loop.create_future() - if self.__lan_send_cmd( - cmd=MIoTLanCmdType.CALL_API, - data=MIoTLanCallApiData( - did=did, - msg=msg, - handler=call_api_handler, - handler_ctx=fut, - timeout_ms=timeout_ms)): - return await fut + def __unsub_device_state(self, data: MIoTLanUnsubDeviceState) -> None: + self._device_state_sub_map.pop(data.key, None) - fut.set_result({ - 'code': MIoTErrorCode.CODE_UNAVAILABLE.value, - 'error': 'send cmd error'}) - return await fut + def __sub_broadcast(self, data: MIoTLanRegisterBroadcastData) -> None: + self._device_msg_matcher[data.key] = data + _LOGGER.debug('lan register broadcast, %s', data.key) - def __cmd_read_handler(self) -> None: - fd_value = os.eventfd_read(self._cmd_event_fd) - if fd_value == 0: - return - while not self._queue.empty(): - mips_cmd: MIoTLanCmd = self._queue.get(block=False) - if mips_cmd.type_ == MIoTLanCmdType.CALL_API: - call_api_data: MIoTLanCallApiData = mips_cmd.data - try: - self.send2device( - did=call_api_data.did, - msg={'from': 'ha.xiaomi_home', **call_api_data.msg}, - handler=call_api_data.handler, - handler_ctx=call_api_data.handler_ctx, - timeout_ms=call_api_data.timeout_ms) - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error('send2device error, %s', err) - call_api_data.handler({ - 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value, - 'error': str(err)}, - call_api_data.handler_ctx) - elif mips_cmd.type_ == MIoTLanCmdType.SUB_DEVICE_STATE: - sub_data: MIoTLanSubDeviceState = mips_cmd.data - self._device_state_sub_map[sub_data.key] = sub_data - elif mips_cmd.type_ == MIoTLanCmdType.UNSUB_DEVICE_STATE: - sub_data: MIoTLanUnsubDeviceState = mips_cmd.data - self._device_state_sub_map.pop(sub_data.key, None) - elif mips_cmd.type_ == MIoTLanCmdType.REG_BROADCAST: - reg_data: MIoTLanRegisterBroadcastData = mips_cmd.data - self._device_msg_matcher[reg_data.key] = reg_data - _LOGGER.debug('lan register broadcast, %s', reg_data.key) - elif mips_cmd.type_ == MIoTLanCmdType.UNREG_BROADCAST: - unreg_data: MIoTLanUnregisterBroadcastData = mips_cmd.data - if self._device_msg_matcher.get(topic=unreg_data.key): - del self._device_msg_matcher[unreg_data.key] - _LOGGER.debug('lan unregister broadcast, %s', unreg_data.key) - elif mips_cmd.type_ == MIoTLanCmdType.GET_DEV_LIST: - get_dev_list_data: MIoTLanGetDevListData = mips_cmd.data - dev_list = { - device.did: { - 'online': device.online, - 'push_available': device.subscribed - } - for device in self._lan_devices.values() - if device.online} - get_dev_list_data.handler( - dev_list, get_dev_list_data.handler_ctx) - elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_UPDATE: - devices: dict[str, dict] = mips_cmd.data - for did, info in devices.items(): - # did MUST be digit(UINT64) - if not did.isdigit(): - _LOGGER.info('invalid did, %s', did) - continue - if ( - 'model' not in info - or info['model'] in self._profile_models): - # Do not support the local control of - # Profile device for the time being - _LOGGER.info( - 'model not support local ctrl, %s, %s', - did, info.get('model')) - continue - if did not in self._lan_devices: - if 'token' not in info: - _LOGGER.error( - 'token not found, %s, %s', did, info) - continue - if len(info['token']) != 32: - _LOGGER.error( - 'invalid device token, %s, %s', did, info) - continue - self._lan_devices[did] = MIoTLanDevice( - manager=self, did=did, token=info['token'], - ip=info.get('ip', None)) - else: - self._lan_devices[did].update_info(info) - elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_DELETE: - device_dids: list[str] = mips_cmd.data - for did in device_dids: - lan_device = self._lan_devices.pop(did, None) - if not lan_device: - continue - lan_device.on_delete() - elif mips_cmd.type_ == MIoTLanCmdType.NET_INFO_UPDATE: - net_data: MIoTLanNetworkUpdateData = mips_cmd.data - if net_data.status == InterfaceStatus.ADD: - self._available_net_ifs.add(net_data.if_name) - if net_data.if_name in self._net_ifs: - self.__create_socket(if_name=net_data.if_name) - elif net_data.status == InterfaceStatus.REMOVE: - self._available_net_ifs.remove(net_data.if_name) - self.__destroy_socket(if_name=net_data.if_name) - elif mips_cmd.type_ == MIoTLanCmdType.NET_IFS_UPDATE: - net_ifs: list[str] = mips_cmd.data - if self._net_ifs != set(net_ifs): - self._net_ifs = set(net_ifs) - for if_name in self._net_ifs: - self.__create_socket(if_name=if_name) - for if_name in list(self._broadcast_socks.keys()): - if if_name not in self._net_ifs: - self.__destroy_socket(if_name=if_name) - elif mips_cmd.type_ == MIoTLanCmdType.OPTIONS_UPDATE: - options: dict = mips_cmd.data - if 'enable_subscribe' in options: - if options['enable_subscribe'] != self._enable_subscribe: - self._enable_subscribe = options['enable_subscribe'] - if not self._enable_subscribe: - # Unsubscribe all - for device in self._lan_devices.values(): - device.unsubscribe() - elif mips_cmd.type_ == MIoTLanCmdType.DEINIT: - # stop the thread - if self._scan_timer: - self._scan_timer.cancel() - self._scan_timer = None - for device in self._lan_devices.values(): - device.on_delete() - self._lan_devices.clear() - for req_data in self._pending_requests.values(): - req_data.timeout.cancel() - self._pending_requests.clear() - for timer in self._reply_msg_buffer.values(): - timer.cancel() - self._reply_msg_buffer.clear() - self._device_msg_matcher = MIoTMatcher() - self.__deinit_socket() - # DO NOT force a event loop to stop. - # It will stop when you release all handles properly. + def __unsub_broadcast(self, data: MIoTLanUnregisterBroadcastData) -> None: + if self._device_msg_matcher.get(topic=data.key): + del self._device_msg_matcher[data.key] + _LOGGER.debug('lan unregister broadcast, %s', data.key) + + def __get_dev_list(self, data: MIoTLanGetDevListData) -> None: + dev_list = { + device.did: { + 'online': device.online, + 'push_available': device.subscribed + } + for device in self._lan_devices.values() + if device.online} + data.handler( + dev_list, data.handler_ctx) + + def __update_devices(self, devices: dict[str, dict]) -> None: + for did, info in devices.items(): + # did MUST be digit(UINT64) + if not did.isdigit(): + _LOGGER.info('invalid did, %s', did) + continue + if ( + 'model' not in info + or info['model'] in self._profile_models): + # Do not support the local control of + # Profile device for the time being + _LOGGER.info( + 'model not support local ctrl, %s, %s', + did, info.get('model')) + continue + if did not in self._lan_devices: + if 'token' not in info: + _LOGGER.error( + 'token not found, %s, %s', did, info) + continue + if len(info['token']) != 32: + _LOGGER.error( + 'invalid device token, %s, %s', did, info) + continue + self._lan_devices[did] = MIoTLanDevice( + manager=self, did=did, token=info['token'], + ip=info.get('ip', None)) + else: + self._lan_devices[did].update_info(info) + + def __delete_devices(self, devices: list[str]) -> None: + for did in devices: + lan_device = self._lan_devices.pop(did, None) + if not lan_device: + continue + lan_device.on_delete() + + def __on_network_info_chnage(self, data: MIoTLanNetworkUpdateData) -> None: + if data.status == InterfaceStatus.ADD: + self._available_net_ifs.add(data.if_name) + if data.if_name in self._net_ifs: + self.__create_socket(if_name=data.if_name) + elif data.status == InterfaceStatus.REMOVE: + self._available_net_ifs.remove(data.if_name) + self.__destroy_socket(if_name=data.if_name) + + def __update_net_ifs(self, net_ifs: list[str]) -> None: + if self._net_ifs != set(net_ifs): + self._net_ifs = set(net_ifs) + for if_name in self._net_ifs: + self.__create_socket(if_name=if_name) + for if_name in list(self._broadcast_socks.keys()): + if if_name not in self._net_ifs: + self.__destroy_socket(if_name=if_name) + + def __update_subscribe_option(self, options: dict) -> None: + if 'enable_subscribe' in options: + if options['enable_subscribe'] != self._enable_subscribe: + self._enable_subscribe = options['enable_subscribe'] + if not self._enable_subscribe: + # Unsubscribe all + for device in self._lan_devices.values(): + device.unsubscribe() + + def __deinit(self) -> None: + # Release all resources + if self._scan_timer: + self._scan_timer.cancel() + self._scan_timer = None + for device in self._lan_devices.values(): + device.on_delete() + self._lan_devices.clear() + for req_data in self._pending_requests.values(): + if req_data.timeout: + req_data.timeout.cancel() + self._pending_requests.clear() + for timer in self._reply_msg_buffer.values(): + timer.cancel() + self._reply_msg_buffer.clear() + self._device_msg_matcher = MIoTMatcher() + self.__deinit_socket() + self._internal_loop.stop() def __init_socket(self) -> None: self.__deinit_socket() @@ -1237,7 +1228,8 @@ class MIoTLan: # Reply req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None) if req: - req.timeout.cancel() + if req.timeout: + req.timeout.cancel() if req.handler is not None: self._main_loop.call_soon_threadsafe( req.handler, msg, req.handler_ctx) @@ -1329,42 +1321,3 @@ class MIoTLan: self._last_scan_interval = min( self._last_scan_interval*2, self.OT_PROBE_INTERVAL_MAX) return self._last_scan_interval - - async def __on_network_info_change( - self, - status: InterfaceStatus, - info: NetworkInfo - ) -> None: - _LOGGER.info( - 'on network info change, status: %s, info: %s', status, info) - available_net_ifs = set() - for if_name in list(self._network.network_info.keys()): - available_net_ifs.add(if_name) - if len(available_net_ifs) == 0: - await self.deinit_async() - self._available_net_ifs = available_net_ifs - return - if self._net_ifs.isdisjoint(available_net_ifs): - _LOGGER.info('no valid net_ifs') - await self.deinit_async() - self._available_net_ifs = available_net_ifs - return - if not self._init_done: - self._available_net_ifs = available_net_ifs - await self.init_async() - return - self.__lan_send_cmd( - MIoTLanCmdType.NET_INFO_UPDATE, MIoTLanNetworkUpdateData( - status=status, if_name=info.name)) - - async def __on_mips_service_change( - self, group_id: str, state: MipsServiceState, data: dict - ) -> None: - _LOGGER.info( - 'on mips service change, %s, %s, %s', group_id, state, data) - if len(self._mips_service.get_services()) > 0: - _LOGGER.info('find central service, deinit miot lan') - await self.deinit_async() - else: - _LOGGER.info('no central service, init miot lan') - await self.init_async()