Compare commits

...

2 Commits

Author SHA1 Message Date
sworld
0afb1455f6 feat: update miot_client.get_miot_instance_async 2024-12-29 01:48:36 +08:00
sworld
405c435841 refator: refactor miot network 2024-12-29 01:27:48 +08:00
2 changed files with 172 additions and 85 deletions

View File

@ -1851,15 +1851,6 @@ async def get_miot_instance_async(
loop: asyncio.AbstractEventLoop = asyncio.get_running_loop() loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
if loop is None: if loop is None:
raise MIoTClientError('loop is None') raise MIoTClientError('loop is None')
# MIoT network
network: Optional[MIoTNetwork] = hass.data[DOMAIN].get(
'miot_network', None)
if not network:
network = MIoTNetwork(loop=loop)
hass.data[DOMAIN]['miot_network'] = network
await network.init_async(
refresh_interval=NETWORK_REFRESH_INTERVAL)
_LOGGER.info('create miot_network instance')
# MIoT storage # MIoT storage
storage: Optional[MIoTStorage] = hass.data[DOMAIN].get( storage: Optional[MIoTStorage] = hass.data[DOMAIN].get(
'miot_storage', None) 'miot_storage', None)
@ -1868,12 +1859,29 @@ async def get_miot_instance_async(
root_path=entry_data['storage_path'], loop=loop) root_path=entry_data['storage_path'], loop=loop)
hass.data[DOMAIN]['miot_storage'] = storage hass.data[DOMAIN]['miot_storage'] = storage
_LOGGER.info('create miot_storage instance') _LOGGER.info('create miot_storage instance')
global_config: dict = await storage.load_user_config_async(
uid='global_config', cloud_server='all',
keys=['network_detect_addr', 'net_interfaces', 'enable_subscribe'])
# MIoT network
network_detect_addr: dict = global_config.get(
'network_detect_addr', {})
network: Optional[MIoTNetwork] = hass.data[DOMAIN].get(
'miot_network', None)
if not network:
network = MIoTNetwork(
ip_addr_list=network_detect_addr.get('ip', []),
http_addr_list=network_detect_addr.get('http', []),
refresh_interval=NETWORK_REFRESH_INTERVAL,
loop=loop)
hass.data[DOMAIN]['miot_network'] = network
await network.init_async()
_LOGGER.info('create miot_network instance')
# MIoT service # MIoT service
mips_service: Optional[MipsService] = hass.data[DOMAIN].get( mips_service: Optional[MipsService] = hass.data[DOMAIN].get(
'mips_service', None) 'mips_service', None)
if not mips_service: if not mips_service:
aiozc = await zeroconf.async_get_async_instance(hass) aiozc = await zeroconf.async_get_async_instance(hass)
mips_service: MipsService = MipsService(aiozc=aiozc, loop=loop) mips_service = MipsService(aiozc=aiozc, loop=loop)
hass.data[DOMAIN]['mips_service'] = mips_service hass.data[DOMAIN]['mips_service'] = mips_service
await mips_service.init_async() await mips_service.init_async()
_LOGGER.info('create mips_service instance') _LOGGER.info('create mips_service instance')
@ -1881,15 +1889,11 @@ async def get_miot_instance_async(
miot_lan: Optional[MIoTLan] = hass.data[DOMAIN].get( miot_lan: Optional[MIoTLan] = hass.data[DOMAIN].get(
'miot_lan', None) 'miot_lan', None)
if not miot_lan: if not miot_lan:
lan_config = (await storage.load_user_config_async(
uid='global_config',
cloud_server='all',
keys=['net_interfaces', 'enable_subscribe'])) or {}
miot_lan = MIoTLan( miot_lan = MIoTLan(
net_ifs=lan_config.get('net_interfaces', []), net_ifs=global_config.get('net_interfaces', []),
network=network, network=network,
mips_service=mips_service, mips_service=mips_service,
enable_subscribe=lan_config.get('enable_subscribe', False), enable_subscribe=global_config.get('enable_subscribe', False),
loop=loop) loop=loop)
hass.data[DOMAIN]['miot_lan'] = miot_lan hass.data[DOMAIN]['miot_lan'] = miot_lan
_LOGGER.info('create miot_lan instance') _LOGGER.info('create miot_lan instance')

View File

@ -52,7 +52,8 @@ import socket
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
import subprocess import subprocess
from typing import Callable, Optional from typing import Callable, Coroutine, Optional
import aiohttp
import psutil import psutil
import ipaddress import ipaddress
@ -77,38 +78,55 @@ class NetworkInfo:
class MIoTNetwork: class MIoTNetwork:
"""MIoT network utilities.""" """MIoT network utilities."""
PING_ADDRESS_LIST = [ _IP_ADDRESS_LIST: list[str] = [
'1.2.4.8', # CNNIC sDNS '1.2.4.8', # CNNIC sDNS
'8.8.8.8', # Google Public DNS '8.8.8.8', # Google Public DNS
'233.5.5.5', # AliDNS '9.9.9.9' # Quad9
'1.1.1.1', # Cloudflare DNS
'114.114.114.114', # 114 DNS
'208.67.222.222', # OpenDNS
'9.9.9.9', # Quad9 DNS
] ]
_HTTP_ADDRESS_LIST: list[str] = [
'https://www.bing.com',
'https://www.google.com',
'https://www.baidu.com'
]
_REFRESH_INTERVAL = 30
_DETECT_TIMEOUT = 6
_main_loop: asyncio.AbstractEventLoop _main_loop: asyncio.AbstractEventLoop
_ip_addr_map: dict[str, float]
_http_addr_list: dict[str, float]
_http_session: aiohttp.ClientSession
_refresh_interval: int _refresh_interval: int
_refresh_task: asyncio.Task _refresh_task: Optional[asyncio.Task]
_refresh_timer: asyncio.TimerHandle _refresh_timer: Optional[asyncio.TimerHandle]
_network_status: bool _network_status: bool
_network_info: dict[str, NetworkInfo] _network_info: dict[str, NetworkInfo]
_sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]] _sub_list_network_status: dict[str, Callable[[bool], Coroutine]]
_sub_list_network_info: dict[str, Callable[[ _sub_list_network_info: dict[str, Callable[[
InterfaceStatus, NetworkInfo], asyncio.Future]] InterfaceStatus, NetworkInfo], Coroutine]]
_ping_address_priority: int
_done_event: asyncio.Event _done_event: asyncio.Event
def __init__( def __init__(
self, loop: Optional[asyncio.AbstractEventLoop] = None self,
ip_addr_list: Optional[list[str]] = None,
http_addr_list: Optional[list[str]] = None,
refresh_interval: Optional[int] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None: ) -> None:
self._main_loop = loop or asyncio.get_running_loop() self._main_loop = loop or asyncio.get_running_loop()
self._ip_addr_map = {
ip: self._DETECT_TIMEOUT for ip in
ip_addr_list or self._IP_ADDRESS_LIST}
self._http_addr_map = {
http: self._DETECT_TIMEOUT for http in
http_addr_list or self._HTTP_ADDRESS_LIST}
self._http_session = aiohttp.ClientSession()
self._refresh_interval = refresh_interval or self._REFRESH_INTERVAL
self._refresh_interval = None
self._refresh_task = None self._refresh_task = None
self._refresh_timer = None self._refresh_timer = None
@ -122,13 +140,10 @@ class MIoTNetwork:
self._done_event = asyncio.Event() self._done_event = asyncio.Event()
@property async def init_async(self) -> bool:
def network_status(self) -> bool: self.__refresh_timer_handler()
return self._network_status # MUST get network info before starting
return await self._done_event.wait()
@property
def network_info(self) -> dict[str, NetworkInfo]:
return self._network_info
async def deinit_async(self) -> None: async def deinit_async(self) -> None:
if self._refresh_task: if self._refresh_task:
@ -137,16 +152,36 @@ class MIoTNetwork:
if self._refresh_timer: if self._refresh_timer:
self._refresh_timer.cancel() self._refresh_timer.cancel()
self._refresh_timer = None self._refresh_timer = None
await self._http_session.close()
self._refresh_interval = None
self._network_status = False self._network_status = False
self._network_info.clear() self._network_info.clear()
self._sub_list_network_status.clear() self._sub_list_network_status.clear()
self._sub_list_network_info.clear() self._sub_list_network_info.clear()
self._done_event.clear() self._done_event.clear()
@property
def network_status(self) -> bool:
return self._network_status
@property
def network_info(self) -> dict[str, NetworkInfo]:
return self._network_info
async def update_addr_list_async(
self,
ip_addr_list: Optional[list[str]] = None,
http_addr_list: Optional[list[str]] = None,
) -> None:
if ip_addr_list:
self._ip_addr_map = {
ip: self._DETECT_TIMEOUT for ip in ip_addr_list}
if http_addr_list:
self._http_addr_map = {
http: self._DETECT_TIMEOUT for http in http_addr_list}
def sub_network_status( def sub_network_status(
self, key: str, handler: Callable[[bool], asyncio.Future] self, key: str, handler: Callable[[bool], Coroutine]
) -> None: ) -> None:
self._sub_list_network_status[key] = handler self._sub_list_network_status[key] = handler
@ -155,65 +190,115 @@ class MIoTNetwork:
def sub_network_info( def sub_network_info(
self, key: str, self, key: str,
handler: Callable[[InterfaceStatus, NetworkInfo], asyncio.Future] handler: Callable[[InterfaceStatus, NetworkInfo], Coroutine]
) -> None: ) -> None:
self._sub_list_network_info[key] = handler self._sub_list_network_info[key] = handler
def unsub_network_info(self, key: str) -> None: def unsub_network_info(self, key: str) -> None:
self._sub_list_network_info.pop(key, None) self._sub_list_network_info.pop(key, None)
async def init_async(self, refresh_interval: int = 30) -> bool:
self._refresh_interval = refresh_interval
self.__refresh_timer_handler()
# MUST get network info before starting
return await self._done_event.wait()
async def refresh_async(self) -> None: async def refresh_async(self) -> None:
self.__refresh_timer_handler() self.__refresh_timer_handler()
async def get_network_status_async(self, timeout: int = 6) -> bool: async def get_network_status_async(self) -> bool:
return await self._main_loop.run_in_executor( try:
None, self.__get_network_status, False, timeout) ip_addr: str = ''
ip_ts: float = self._DETECT_TIMEOUT
for ip, ts in self._ip_addr_map.items():
if ts < ip_ts:
ip_addr = ip
ip_ts = ts
if (
ip_ts < self._DETECT_TIMEOUT
and self.ping_multi_async(ip_list=[ip_addr])
):
return True
http_addr: str = ''
http_ts: float = self._DETECT_TIMEOUT
for http, ts in self._http_addr_map.items():
if ts < http_ts:
http_addr = http
http_ts = ts
if (
http_ts < self._DETECT_TIMEOUT
and await self.http_multi_async(url_list=[http_addr])
):
return True
# Detect all addresses
results = await asyncio.gather(
*[self.ping_multi_async(), self.http_multi_async()])
return any(results)
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('get network status error, %s', err)
return False
async def get_network_info_async(self) -> dict[str, NetworkInfo]: async def get_network_info_async(self) -> dict[str, NetworkInfo]:
return await self._main_loop.run_in_executor( return await self._main_loop.run_in_executor(
None, self.__get_network_info) None, self.__get_network_info)
async def ping_multi_async(
self, ip_list: Optional[list[str]] = None
) -> bool:
addr_list = ip_list or list(self._ip_addr_map.keys())
tasks = []
for addr in addr_list:
tasks.append(self.__ping_async(addr))
results = await asyncio.gather(*tasks)
for addr, ts in zip(addr_list, results):
if addr in self._ip_addr_map:
self._ip_addr_map[addr] = ts
return any([ts < self._DETECT_TIMEOUT for ts in results])
async def http_multi_async(
self, url_list: Optional[list[str]] = None
) -> bool:
addr_list = url_list or list(self._http_addr_map.keys())
tasks = []
for addr in addr_list:
tasks.append(self.__http_async(url=addr))
results = await asyncio.gather(*tasks)
for addr, ts in zip(addr_list, results):
if addr in self._http_addr_map:
self._http_addr_map[addr] = ts
return any([ts < self._DETECT_TIMEOUT for ts in results])
def __calc_network_address(self, ip: str, netmask: str) -> str: def __calc_network_address(self, ip: str, netmask: str) -> str:
return str(ipaddress.IPv4Network( return str(ipaddress.IPv4Network(
f'{ip}/{netmask}', strict=False).network_address) f'{ip}/{netmask}', strict=False).network_address)
def __ping( async def __ping_async(self, address: Optional[str] = None) -> float:
self, address: Optional[str] = None, timeout: int = 6 start_ts: float = self._main_loop.time()
) -> bool:
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', address]
try: try:
output = subprocess.run( process = await asyncio.create_subprocess_exec(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, *(
check=True, timeout=timeout) [
return output.returncode == 0 'ping', '-n', '1', '-w',
str(self._DETECT_TIMEOUT*1000), address]
if platform.system().lower() == 'windows' else
[
'ping', '-c', '1', '-w',
str(self._DETECT_TIMEOUT), address]),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
await process.communicate()
if process.returncode == 0:
return self._main_loop.time() - start_ts
return self._DETECT_TIMEOUT
except Exception as err: # pylint: disable=broad-exception-caught
print(err)
return self._DETECT_TIMEOUT
async def __http_async(self, url: str) -> float:
start_ts: float = self._main_loop.time()
try:
async with self._http_session.get(
url, timeout=self._DETECT_TIMEOUT) as response:
if response.status == 200:
return self._main_loop.time() - start_ts
except Exception: # pylint: disable=broad-exception-caught except Exception: # pylint: disable=broad-exception-caught
return False pass
return self._DETECT_TIMEOUT
def __get_network_status(
self, with_retry: bool = True, timeout: int = 6
) -> bool:
if self._ping_address_priority >= len(self.PING_ADDRESS_LIST):
self._ping_address_priority = 0
if self.__ping(
self.PING_ADDRESS_LIST[self._ping_address_priority], timeout):
return True
if not with_retry:
return False
for index in range(len(self.PING_ADDRESS_LIST)):
if index == self._ping_address_priority:
continue
if self.__ping(self.PING_ADDRESS_LIST[index], timeout):
self._ping_address_priority = index
return True
return False
def __get_network_info(self) -> dict[str, NetworkInfo]: def __get_network_info(self) -> dict[str, NetworkInfo]:
interfaces = psutil.net_if_addrs() interfaces = psutil.net_if_addrs()
@ -246,12 +331,10 @@ class MIoTNetwork:
for handler in self._sub_list_network_info.values(): for handler in self._sub_list_network_info.values():
self._main_loop.create_task(handler(status, info)) self._main_loop.create_task(handler(status, info))
async def __update_status_and_info_async(self, timeout: int = 6) -> None: async def __update_status_and_info_async(self) -> None:
try: try:
status: bool = await self._main_loop.run_in_executor( status: bool = await self.get_network_status_async()
None, self.__get_network_status, timeout) infos = await self.get_network_info_async()
infos = await self._main_loop.run_in_executor(
None, self.__get_network_info)
if self._network_status != status: if self._network_status != status:
for handler in self._sub_list_network_status.values(): for handler in self._sub_list_network_status.values():
@ -273,7 +356,7 @@ class MIoTNetwork:
# Remove # Remove
self.__call_network_info_change( self.__call_network_info_change(
InterfaceStatus.REMOVE, InterfaceStatus.REMOVE,
self._network_info.pop(name, None)) self._network_info.pop(name))
# Add # Add
for name, info in infos.items(): for name, info in infos.items():
self._network_info[name] = info self._network_info[name] = info