Use call_soon_threadsafe instead of event fd for ipc

This commit is contained in:
Feng Wang 2024-12-21 21:39:59 +08:00
parent faa13c53e8
commit 794505f082
2 changed files with 222 additions and 269 deletions

View File

@ -1022,7 +1022,7 @@ class MIoTClient:
handler=self.__on_lan_device_state_changed)
for did, info in (
await self._miot_lan.get_dev_list_async()).items():
self.__on_lan_device_state_changed(
await self.__on_lan_device_state_changed(
did=did, state=info, ctx=None)
_LOGGER.info('lan device list, %s', self._device_list_lan)
self._miot_lan.update_devices(devices={

View File

@ -53,8 +53,6 @@ import asyncio
from dataclasses import dataclass
from enum import Enum, auto
import logging
import os
import queue
import random
import secrets
import socket
@ -77,29 +75,6 @@ from .common import (
_LOGGER = logging.getLogger(__name__)
class MIoTLanCmdType(Enum):
"""MIoT lan command."""
DEINIT = 0
CALL_API = auto()
SUB_DEVICE_STATE = auto()
UNSUB_DEVICE_STATE = auto()
REG_BROADCAST = auto()
UNREG_BROADCAST = auto()
GET_DEV_LIST = auto()
DEVICE_UPDATE = auto()
DEVICE_DELETE = auto()
NET_INFO_UPDATE = auto()
NET_IFS_UPDATE = auto()
OPTIONS_UPDATE = auto()
@dataclass
class MIoTLanCmd:
"""MIoT lan command."""
type_: MIoTLanCmdType
data: Any
@dataclass
class MIoTLanCmdData:
handler: Callable[[dict, Any], None]
@ -142,7 +117,7 @@ class MIoTLanUnsubDeviceState:
@dataclass
class MIoTLanSubDeviceState:
key: str
handler: Callable[[str, dict, Any], None]
handler: Callable[[str, dict, Any], Coroutine]
handler_ctx: Any
@ -157,7 +132,7 @@ class MIoTLanRequestData:
msg_id: int
handler: Optional[Callable[[dict, Any], None]]
handler_ctx: Any
timeout: asyncio.TimerHandle
timeout: Optional[asyncio.TimerHandle]
class MIoTLanDeviceState(Enum):
@ -202,6 +177,8 @@ class MIoTLanDevice:
_ka_timer: Optional[asyncio.TimerHandle]
_ka_internal: float
# All functions SHOULD be called from the internal loop
def __init__(
self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None
) -> None:
@ -511,6 +488,8 @@ class MIoTLan:
_init_done: bool
# The following should be called from the main loop
def __init__(
self,
net_ifs: list[str],
@ -528,7 +507,7 @@ class MIoTLan:
self._net_ifs = set(net_ifs)
self._network = network
self._network.sub_network_info(
key='miot_lan', handler=self.__on_network_info_change)
key='miot_lan', handler=self.__on_network_info_change_external_async)
self._mips_service = mips_service
self._mips_service.sub_service_change(
key='miot_lan', group_id='*',
@ -611,8 +590,6 @@ class MIoTLan:
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('load profile models error, %s', err)
self._profile_models = {}
self._queue = queue.Queue()
self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK)
# All tasks meant for the internal loop should happen in this thread
self._thread = threading.Thread(target=self.__internal_loop_thread)
self._thread.name = 'miot_lan'
@ -629,8 +606,6 @@ class MIoTLan:
self._internal_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._internal_loop)
self.__init_socket()
self._internal_loop.add_reader(
self._cmd_event_fd, self.__cmd_read_handler)
self._scan_timer = self._internal_loop.call_later(
int(3*random.random()), self.__scan_devices)
self._internal_loop.run_forever()
@ -642,7 +617,7 @@ class MIoTLan:
_LOGGER.info('miot lan not init')
return
self._init_done = False
self.__lan_send_cmd(MIoTLanCmdType.DEINIT, None)
self._internal_loop.call_soon_threadsafe(self.__deinit)
self._thread.join()
self._profile_models = {}
@ -683,9 +658,9 @@ class MIoTLan:
self._net_ifs = set(net_ifs)
await self.init_async()
return
self.__lan_send_cmd(
cmd=MIoTLanCmdType.NET_IFS_UPDATE,
data=net_ifs)
self._internal_loop.call_soon_threadsafe(
self.__update_net_ifs,
net_ifs)
async def vote_for_lan_ctrl_async(self, key: str, vote: bool) -> None:
_LOGGER.info('vote for lan ctrl, %s, %s', key, vote)
@ -700,22 +675,21 @@ class MIoTLan:
if not self._init_done:
self._enable_subscribe = enable_subscribe
return
self.__lan_send_cmd(
cmd=MIoTLanCmdType.OPTIONS_UPDATE,
data={
'enable_subscribe': enable_subscribe, })
self._internal_loop.call_soon_threadsafe(
self.__update_subscribe_option,
{'enable_subscribe': enable_subscribe})
def update_devices(self, devices: dict[str, dict]) -> bool:
_LOGGER.info('update devices, %s', devices)
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.DEVICE_UPDATE,
data=devices)
self._internal_loop.call_soon_threadsafe(
self.__update_devices, devices)
return True
def delete_devices(self, devices: list[str]) -> bool:
_LOGGER.info('delete devices, %s', devices)
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.DEVICE_DELETE,
data=devices)
self._internal_loop.call_soon_threadsafe(
self.__delete_devices, devices)
return True
def sub_lan_state(
self, key: str, handler: Callable[[bool], Coroutine]
@ -727,19 +701,20 @@ class MIoTLan:
@final
def sub_device_state(
self, key: str, handler: Callable[[str, dict, Any], None],
self, key: str, handler: Callable[[str, dict, Any], Coroutine],
handler_ctx: Any = None
) -> bool:
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.SUB_DEVICE_STATE,
data=MIoTLanSubDeviceState(
self._internal_loop.call_soon_threadsafe(
self.__sub_device_state,
MIoTLanSubDeviceState(
key=key, handler=handler, handler_ctx=handler_ctx))
return True
@final
def unsub_device_state(self, key: str) -> bool:
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.UNSUB_DEVICE_STATE,
data=MIoTLanUnsubDeviceState(key=key))
self._internal_loop.call_soon_threadsafe(
self.__unsub_device_state, MIoTLanUnsubDeviceState(key=key))
return True
@final
def sub_prop(
@ -751,10 +726,11 @@ class MIoTLan:
key = (
f'{did}/p/'
f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.REG_BROADCAST,
data=MIoTLanRegisterBroadcastData(
self._internal_loop.call_soon_threadsafe(
self.__sub_broadcast,
MIoTLanRegisterBroadcastData(
key=key, handler=handler, handler_ctx=handler_ctx))
return True
@final
def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool:
@ -763,9 +739,10 @@ class MIoTLan:
key = (
f'{did}/p/'
f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.UNREG_BROADCAST,
data=MIoTLanUnregisterBroadcastData(key=key))
self._internal_loop.call_soon_threadsafe(
self.__unsub_broadcast,
MIoTLanUnregisterBroadcastData(key=key))
return True
@final
def sub_event(
@ -777,10 +754,11 @@ class MIoTLan:
key = (
f'{did}/e/'
f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.REG_BROADCAST,
data=MIoTLanRegisterBroadcastData(
self._internal_loop.call_soon_threadsafe(
self.__sub_broadcast,
MIoTLanRegisterBroadcastData(
key=key, handler=handler, handler_ctx=handler_ctx))
return True
@final
def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool:
@ -789,9 +767,10 @@ class MIoTLan:
key = (
f'{did}/e/'
f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
return self.__lan_send_cmd(
cmd=MIoTLanCmdType.UNREG_BROADCAST,
data=MIoTLanUnregisterBroadcastData(key=key))
self._internal_loop.call_soon_threadsafe(
self.__unsub_broadcast,
MIoTLanUnregisterBroadcastData(key=key))
return True
@final
async def get_prop_async(
@ -870,17 +849,67 @@ class MIoTLan:
fut.set_result, msg)
fut: asyncio.Future = self._main_loop.create_future()
if self.__lan_send_cmd(
MIoTLanCmdType.GET_DEV_LIST,
self._internal_loop.call_soon_threadsafe(
self.__get_dev_list,
MIoTLanGetDevListData(
handler=get_device_list_handler,
handler_ctx=fut,
timeout_ms=timeout_ms)):
return await fut
_LOGGER.error('get_dev_list_async error, send cmd failed')
fut.set_result({})
timeout_ms=timeout_ms))
return await fut
async def __call_api_async(
self, did: str, msg: dict, timeout_ms: int = 10000
) -> dict:
def call_api_handler(msg: dict, fut: asyncio.Future):
self._main_loop.call_soon_threadsafe(
fut.set_result, msg)
fut: asyncio.Future = self._main_loop.create_future()
self._internal_loop.call_soon_threadsafe(
self.__call_api, did, msg, call_api_handler, fut, timeout_ms)
return await fut
async def __on_network_info_change_external_async(
self,
status: InterfaceStatus,
info: NetworkInfo
) -> None:
_LOGGER.info(
'on network info change, status: %s, info: %s', status, info)
available_net_ifs = set()
for if_name in list(self._network.network_info.keys()):
available_net_ifs.add(if_name)
if len(available_net_ifs) == 0:
await self.deinit_async()
self._available_net_ifs = available_net_ifs
return
if self._net_ifs.isdisjoint(available_net_ifs):
_LOGGER.info('no valid net_ifs')
await self.deinit_async()
self._available_net_ifs = available_net_ifs
return
if not self._init_done:
self._available_net_ifs = available_net_ifs
await self.init_async()
return
self._internal_loop.call_soon_threadsafe(
self.__on_network_info_chnage,
MIoTLanNetworkUpdateData(status=status, if_name=info.name))
async def __on_mips_service_change(
self, group_id: str, state: MipsServiceState, data: dict
) -> None:
_LOGGER.info(
'on mips service change, %s, %s, %s', group_id, state, data)
if len(self._mips_service.get_services()) > 0:
_LOGGER.info('find central service, deinit miot lan')
await self.deinit_async()
else:
_LOGGER.info('no central service, init miot lan')
await self.init_async()
# The folowing methods SHOULD ONLY be called in the internal loop
def ping(self, if_name: str | None, target_ip: str) -> None:
if not target_ip:
return
@ -956,8 +985,8 @@ class MIoTLan:
def broadcast_device_state(self, did: str, state: dict) -> None:
for handler in self._device_state_sub_map.values():
self._main_loop.call_soon_threadsafe(
self._main_loop.create_task,
handler.handler(did, state, handler.handler_ctx))
lambda: self._main_loop.create_task(
handler.handler(did, state, handler.handler_ctx)))
def __gen_msg_id(self) -> int:
if not self._msg_id_counter:
@ -967,167 +996,129 @@ class MIoTLan:
self._msg_id_counter = 1
return self._msg_id_counter
def __lan_send_cmd(self, cmd: MIoTLanCmdType, data: Any) -> bool:
def __call_api(self, did: str, msg: dict, handler: Callable, handler_ctx: Any, timeout_ms: int = 10000) -> None:
try:
self._queue.put(MIoTLanCmd(type_=cmd, data=data))
os.eventfd_write(self._cmd_event_fd, 1)
return True
self.send2device(
did=did,
msg={'from': 'ha.xiaomi_home', **msg},
handler=handler,
handler_ctx=handler_ctx,
timeout_ms=timeout_ms)
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('send cmd error, %s, %s', cmd, err)
return False
_LOGGER.error('send2device error, %s', err)
handler({
'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
'error': str(err)},
handler_ctx)
async def __call_api_async(
self, did: str, msg: dict, timeout_ms: int = 10000
) -> dict:
def call_api_handler(msg: dict, fut: asyncio.Future):
self._main_loop.call_soon_threadsafe(
fut.set_result, msg)
def __sub_device_state(self, data: MIoTLanSubDeviceState) -> None:
self._device_state_sub_map[data.key] = data
fut: asyncio.Future = self._main_loop.create_future()
if self.__lan_send_cmd(
cmd=MIoTLanCmdType.CALL_API,
data=MIoTLanCallApiData(
did=did,
msg=msg,
handler=call_api_handler,
handler_ctx=fut,
timeout_ms=timeout_ms)):
return await fut
def __unsub_device_state(self, data: MIoTLanUnsubDeviceState) -> None:
self._device_state_sub_map.pop(data.key, None)
fut.set_result({
'code': MIoTErrorCode.CODE_UNAVAILABLE.value,
'error': 'send cmd error'})
return await fut
def __sub_broadcast(self, data: MIoTLanRegisterBroadcastData) -> None:
self._device_msg_matcher[data.key] = data
_LOGGER.debug('lan register broadcast, %s', data.key)
def __cmd_read_handler(self) -> None:
fd_value = os.eventfd_read(self._cmd_event_fd)
if fd_value == 0:
return
while not self._queue.empty():
mips_cmd: MIoTLanCmd = self._queue.get(block=False)
if mips_cmd.type_ == MIoTLanCmdType.CALL_API:
call_api_data: MIoTLanCallApiData = mips_cmd.data
try:
self.send2device(
did=call_api_data.did,
msg={'from': 'ha.xiaomi_home', **call_api_data.msg},
handler=call_api_data.handler,
handler_ctx=call_api_data.handler_ctx,
timeout_ms=call_api_data.timeout_ms)
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('send2device error, %s', err)
call_api_data.handler({
'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
'error': str(err)},
call_api_data.handler_ctx)
elif mips_cmd.type_ == MIoTLanCmdType.SUB_DEVICE_STATE:
sub_data: MIoTLanSubDeviceState = mips_cmd.data
self._device_state_sub_map[sub_data.key] = sub_data
elif mips_cmd.type_ == MIoTLanCmdType.UNSUB_DEVICE_STATE:
sub_data: MIoTLanUnsubDeviceState = mips_cmd.data
self._device_state_sub_map.pop(sub_data.key, None)
elif mips_cmd.type_ == MIoTLanCmdType.REG_BROADCAST:
reg_data: MIoTLanRegisterBroadcastData = mips_cmd.data
self._device_msg_matcher[reg_data.key] = reg_data
_LOGGER.debug('lan register broadcast, %s', reg_data.key)
elif mips_cmd.type_ == MIoTLanCmdType.UNREG_BROADCAST:
unreg_data: MIoTLanUnregisterBroadcastData = mips_cmd.data
if self._device_msg_matcher.get(topic=unreg_data.key):
del self._device_msg_matcher[unreg_data.key]
_LOGGER.debug('lan unregister broadcast, %s', unreg_data.key)
elif mips_cmd.type_ == MIoTLanCmdType.GET_DEV_LIST:
get_dev_list_data: MIoTLanGetDevListData = mips_cmd.data
dev_list = {
device.did: {
'online': device.online,
'push_available': device.subscribed
}
for device in self._lan_devices.values()
if device.online}
get_dev_list_data.handler(
dev_list, get_dev_list_data.handler_ctx)
elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_UPDATE:
devices: dict[str, dict] = mips_cmd.data
for did, info in devices.items():
# did MUST be digit(UINT64)
if not did.isdigit():
_LOGGER.info('invalid did, %s', did)
continue
if (
'model' not in info
or info['model'] in self._profile_models):
# Do not support the local control of
# Profile device for the time being
_LOGGER.info(
'model not support local ctrl, %s, %s',
did, info.get('model'))
continue
if did not in self._lan_devices:
if 'token' not in info:
_LOGGER.error(
'token not found, %s, %s', did, info)
continue
if len(info['token']) != 32:
_LOGGER.error(
'invalid device token, %s, %s', did, info)
continue
self._lan_devices[did] = MIoTLanDevice(
manager=self, did=did, token=info['token'],
ip=info.get('ip', None))
else:
self._lan_devices[did].update_info(info)
elif mips_cmd.type_ == MIoTLanCmdType.DEVICE_DELETE:
device_dids: list[str] = mips_cmd.data
for did in device_dids:
lan_device = self._lan_devices.pop(did, None)
if not lan_device:
continue
lan_device.on_delete()
elif mips_cmd.type_ == MIoTLanCmdType.NET_INFO_UPDATE:
net_data: MIoTLanNetworkUpdateData = mips_cmd.data
if net_data.status == InterfaceStatus.ADD:
self._available_net_ifs.add(net_data.if_name)
if net_data.if_name in self._net_ifs:
self.__create_socket(if_name=net_data.if_name)
elif net_data.status == InterfaceStatus.REMOVE:
self._available_net_ifs.remove(net_data.if_name)
self.__destroy_socket(if_name=net_data.if_name)
elif mips_cmd.type_ == MIoTLanCmdType.NET_IFS_UPDATE:
net_ifs: list[str] = mips_cmd.data
if self._net_ifs != set(net_ifs):
self._net_ifs = set(net_ifs)
for if_name in self._net_ifs:
self.__create_socket(if_name=if_name)
for if_name in list(self._broadcast_socks.keys()):
if if_name not in self._net_ifs:
self.__destroy_socket(if_name=if_name)
elif mips_cmd.type_ == MIoTLanCmdType.OPTIONS_UPDATE:
options: dict = mips_cmd.data
if 'enable_subscribe' in options:
if options['enable_subscribe'] != self._enable_subscribe:
self._enable_subscribe = options['enable_subscribe']
if not self._enable_subscribe:
# Unsubscribe all
for device in self._lan_devices.values():
device.unsubscribe()
elif mips_cmd.type_ == MIoTLanCmdType.DEINIT:
# stop the thread
if self._scan_timer:
self._scan_timer.cancel()
self._scan_timer = None
for device in self._lan_devices.values():
device.on_delete()
self._lan_devices.clear()
for req_data in self._pending_requests.values():
req_data.timeout.cancel()
self._pending_requests.clear()
for timer in self._reply_msg_buffer.values():
timer.cancel()
self._reply_msg_buffer.clear()
self._device_msg_matcher = MIoTMatcher()
self.__deinit_socket()
# DO NOT force a event loop to stop.
# It will stop when you release all handles properly.
def __unsub_broadcast(self, data: MIoTLanUnregisterBroadcastData) -> None:
if self._device_msg_matcher.get(topic=data.key):
del self._device_msg_matcher[data.key]
_LOGGER.debug('lan unregister broadcast, %s', data.key)
def __get_dev_list(self, data: MIoTLanGetDevListData) -> None:
dev_list = {
device.did: {
'online': device.online,
'push_available': device.subscribed
}
for device in self._lan_devices.values()
if device.online}
data.handler(
dev_list, data.handler_ctx)
def __update_devices(self, devices: dict[str, dict]) -> None:
for did, info in devices.items():
# did MUST be digit(UINT64)
if not did.isdigit():
_LOGGER.info('invalid did, %s', did)
continue
if (
'model' not in info
or info['model'] in self._profile_models):
# Do not support the local control of
# Profile device for the time being
_LOGGER.info(
'model not support local ctrl, %s, %s',
did, info.get('model'))
continue
if did not in self._lan_devices:
if 'token' not in info:
_LOGGER.error(
'token not found, %s, %s', did, info)
continue
if len(info['token']) != 32:
_LOGGER.error(
'invalid device token, %s, %s', did, info)
continue
self._lan_devices[did] = MIoTLanDevice(
manager=self, did=did, token=info['token'],
ip=info.get('ip', None))
else:
self._lan_devices[did].update_info(info)
def __delete_devices(self, devices: list[str]) -> None:
for did in devices:
lan_device = self._lan_devices.pop(did, None)
if not lan_device:
continue
lan_device.on_delete()
def __on_network_info_chnage(self, data: MIoTLanNetworkUpdateData) -> None:
if data.status == InterfaceStatus.ADD:
self._available_net_ifs.add(data.if_name)
if data.if_name in self._net_ifs:
self.__create_socket(if_name=data.if_name)
elif data.status == InterfaceStatus.REMOVE:
self._available_net_ifs.remove(data.if_name)
self.__destroy_socket(if_name=data.if_name)
def __update_net_ifs(self, net_ifs: list[str]) -> None:
if self._net_ifs != set(net_ifs):
self._net_ifs = set(net_ifs)
for if_name in self._net_ifs:
self.__create_socket(if_name=if_name)
for if_name in list(self._broadcast_socks.keys()):
if if_name not in self._net_ifs:
self.__destroy_socket(if_name=if_name)
def __update_subscribe_option(self, options: dict) -> None:
if 'enable_subscribe' in options:
if options['enable_subscribe'] != self._enable_subscribe:
self._enable_subscribe = options['enable_subscribe']
if not self._enable_subscribe:
# Unsubscribe all
for device in self._lan_devices.values():
device.unsubscribe()
def __deinit(self) -> None:
# Release all resources
if self._scan_timer:
self._scan_timer.cancel()
self._scan_timer = None
for device in self._lan_devices.values():
device.on_delete()
self._lan_devices.clear()
for req_data in self._pending_requests.values():
if req_data.timeout:
req_data.timeout.cancel()
self._pending_requests.clear()
for timer in self._reply_msg_buffer.values():
timer.cancel()
self._reply_msg_buffer.clear()
self._device_msg_matcher = MIoTMatcher()
self.__deinit_socket()
self._internal_loop.stop()
def __init_socket(self) -> None:
self.__deinit_socket()
@ -1237,7 +1228,8 @@ class MIoTLan:
# Reply
req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None)
if req:
req.timeout.cancel()
if req.timeout:
req.timeout.cancel()
if req.handler is not None:
self._main_loop.call_soon_threadsafe(
req.handler, msg, req.handler_ctx)
@ -1329,42 +1321,3 @@ class MIoTLan:
self._last_scan_interval = min(
self._last_scan_interval*2, self.OT_PROBE_INTERVAL_MAX)
return self._last_scan_interval
async def __on_network_info_change(
self,
status: InterfaceStatus,
info: NetworkInfo
) -> None:
_LOGGER.info(
'on network info change, status: %s, info: %s', status, info)
available_net_ifs = set()
for if_name in list(self._network.network_info.keys()):
available_net_ifs.add(if_name)
if len(available_net_ifs) == 0:
await self.deinit_async()
self._available_net_ifs = available_net_ifs
return
if self._net_ifs.isdisjoint(available_net_ifs):
_LOGGER.info('no valid net_ifs')
await self.deinit_async()
self._available_net_ifs = available_net_ifs
return
if not self._init_done:
self._available_net_ifs = available_net_ifs
await self.init_async()
return
self.__lan_send_cmd(
MIoTLanCmdType.NET_INFO_UPDATE, MIoTLanNetworkUpdateData(
status=status, if_name=info.name))
async def __on_mips_service_change(
self, group_id: str, state: MipsServiceState, data: dict
) -> None:
_LOGGER.info(
'on mips service change, %s, %s, %s', group_id, state, data)
if len(self._mips_service.get_services()) > 0:
_LOGGER.info('find central service, deinit miot lan')
await self.deinit_async()
else:
_LOGGER.info('no central service, init miot lan')
await self.init_async()