From 405c43584141362ea908a1c7290fe0ea10b571ed Mon Sep 17 00:00:00 2001 From: sworld Date: Sun, 29 Dec 2024 01:23:51 +0800 Subject: [PATCH] refator: refactor miot network --- .../xiaomi_home/miot/miot_network.py | 221 ++++++++++++------ 1 file changed, 152 insertions(+), 69 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_network.py b/custom_components/xiaomi_home/miot/miot_network.py index a4606eb..915ca61 100644 --- a/custom_components/xiaomi_home/miot/miot_network.py +++ b/custom_components/xiaomi_home/miot/miot_network.py @@ -52,7 +52,8 @@ import socket from dataclasses import dataclass from enum import Enum, auto import subprocess -from typing import Callable, Optional +from typing import Callable, Coroutine, Optional +import aiohttp import psutil import ipaddress @@ -77,38 +78,55 @@ class NetworkInfo: class MIoTNetwork: """MIoT network utilities.""" - PING_ADDRESS_LIST = [ + _IP_ADDRESS_LIST: list[str] = [ '1.2.4.8', # CNNIC sDNS '8.8.8.8', # Google Public DNS - '233.5.5.5', # AliDNS - '1.1.1.1', # Cloudflare DNS - '114.114.114.114', # 114 DNS - '208.67.222.222', # OpenDNS - '9.9.9.9', # Quad9 DNS + '9.9.9.9' # Quad9 ] + _HTTP_ADDRESS_LIST: list[str] = [ + 'https://www.bing.com', + 'https://www.google.com', + 'https://www.baidu.com' + ] + _REFRESH_INTERVAL = 30 + _DETECT_TIMEOUT = 6 + _main_loop: asyncio.AbstractEventLoop + _ip_addr_map: dict[str, float] + _http_addr_list: dict[str, float] + _http_session: aiohttp.ClientSession + _refresh_interval: int - _refresh_task: asyncio.Task - _refresh_timer: asyncio.TimerHandle + _refresh_task: Optional[asyncio.Task] + _refresh_timer: Optional[asyncio.TimerHandle] _network_status: bool _network_info: dict[str, NetworkInfo] - _sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]] + _sub_list_network_status: dict[str, Callable[[bool], Coroutine]] _sub_list_network_info: dict[str, Callable[[ - InterfaceStatus, NetworkInfo], asyncio.Future]] - - _ping_address_priority: int + InterfaceStatus, NetworkInfo], Coroutine]] _done_event: asyncio.Event def __init__( - self, loop: Optional[asyncio.AbstractEventLoop] = None + self, + ip_addr_list: Optional[list[str]] = None, + http_addr_list: Optional[list[str]] = None, + refresh_interval: Optional[int] = None, + loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._main_loop = loop or asyncio.get_running_loop() + self._ip_addr_map = { + ip: self._DETECT_TIMEOUT for ip in + ip_addr_list or self._IP_ADDRESS_LIST} + self._http_addr_map = { + http: self._DETECT_TIMEOUT for http in + http_addr_list or self._HTTP_ADDRESS_LIST} + self._http_session = aiohttp.ClientSession() + self._refresh_interval = refresh_interval or self._REFRESH_INTERVAL - self._refresh_interval = None self._refresh_task = None self._refresh_timer = None @@ -122,13 +140,10 @@ class MIoTNetwork: self._done_event = asyncio.Event() - @property - def network_status(self) -> bool: - return self._network_status - - @property - def network_info(self) -> dict[str, NetworkInfo]: - return self._network_info + async def init_async(self) -> bool: + self.__refresh_timer_handler() + # MUST get network info before starting + return await self._done_event.wait() async def deinit_async(self) -> None: if self._refresh_task: @@ -137,16 +152,36 @@ class MIoTNetwork: if self._refresh_timer: self._refresh_timer.cancel() self._refresh_timer = None + await self._http_session.close() - self._refresh_interval = None self._network_status = False self._network_info.clear() self._sub_list_network_status.clear() self._sub_list_network_info.clear() self._done_event.clear() + @property + def network_status(self) -> bool: + return self._network_status + + @property + def network_info(self) -> dict[str, NetworkInfo]: + return self._network_info + + async def update_addr_list_async( + self, + ip_addr_list: Optional[list[str]] = None, + http_addr_list: Optional[list[str]] = None, + ) -> None: + if ip_addr_list: + self._ip_addr_map = { + ip: self._DETECT_TIMEOUT for ip in ip_addr_list} + if http_addr_list: + self._http_addr_map = { + http: self._DETECT_TIMEOUT for http in http_addr_list} + def sub_network_status( - self, key: str, handler: Callable[[bool], asyncio.Future] + self, key: str, handler: Callable[[bool], Coroutine] ) -> None: self._sub_list_network_status[key] = handler @@ -155,65 +190,115 @@ class MIoTNetwork: def sub_network_info( self, key: str, - handler: Callable[[InterfaceStatus, NetworkInfo], asyncio.Future] + handler: Callable[[InterfaceStatus, NetworkInfo], Coroutine] ) -> None: self._sub_list_network_info[key] = handler def unsub_network_info(self, key: str) -> None: self._sub_list_network_info.pop(key, None) - async def init_async(self, refresh_interval: int = 30) -> bool: - self._refresh_interval = refresh_interval - self.__refresh_timer_handler() - # MUST get network info before starting - return await self._done_event.wait() - async def refresh_async(self) -> None: self.__refresh_timer_handler() - async def get_network_status_async(self, timeout: int = 6) -> bool: - return await self._main_loop.run_in_executor( - None, self.__get_network_status, False, timeout) + async def get_network_status_async(self) -> bool: + try: + ip_addr: str = '' + ip_ts: float = self._DETECT_TIMEOUT + for ip, ts in self._ip_addr_map.items(): + if ts < ip_ts: + ip_addr = ip + ip_ts = ts + if ( + ip_ts < self._DETECT_TIMEOUT + and self.ping_multi_async(ip_list=[ip_addr]) + ): + return True + http_addr: str = '' + http_ts: float = self._DETECT_TIMEOUT + for http, ts in self._http_addr_map.items(): + if ts < http_ts: + http_addr = http + http_ts = ts + if ( + http_ts < self._DETECT_TIMEOUT + and await self.http_multi_async(url_list=[http_addr]) + ): + return True + # Detect all addresses + results = await asyncio.gather( + *[self.ping_multi_async(), self.http_multi_async()]) + return any(results) + except Exception as err: # pylint: disable=broad-exception-caught + _LOGGER.error('get network status error, %s', err) + return False async def get_network_info_async(self) -> dict[str, NetworkInfo]: return await self._main_loop.run_in_executor( None, self.__get_network_info) + async def ping_multi_async( + self, ip_list: Optional[list[str]] = None + ) -> bool: + addr_list = ip_list or list(self._ip_addr_map.keys()) + tasks = [] + for addr in addr_list: + tasks.append(self.__ping_async(addr)) + results = await asyncio.gather(*tasks) + for addr, ts in zip(addr_list, results): + if addr in self._ip_addr_map: + self._ip_addr_map[addr] = ts + return any([ts < self._DETECT_TIMEOUT for ts in results]) + + async def http_multi_async( + self, url_list: Optional[list[str]] = None + ) -> bool: + addr_list = url_list or list(self._http_addr_map.keys()) + tasks = [] + for addr in addr_list: + tasks.append(self.__http_async(url=addr)) + results = await asyncio.gather(*tasks) + for addr, ts in zip(addr_list, results): + if addr in self._http_addr_map: + self._http_addr_map[addr] = ts + return any([ts < self._DETECT_TIMEOUT for ts in results]) + def __calc_network_address(self, ip: str, netmask: str) -> str: return str(ipaddress.IPv4Network( f'{ip}/{netmask}', strict=False).network_address) - def __ping( - self, address: Optional[str] = None, timeout: int = 6 - ) -> bool: - param = '-n' if platform.system().lower() == 'windows' else '-c' - command = ['ping', param, '1', address] + async def __ping_async(self, address: Optional[str] = None) -> float: + start_ts: float = self._main_loop.time() try: - output = subprocess.run( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - check=True, timeout=timeout) - return output.returncode == 0 + process = await asyncio.create_subprocess_exec( + *( + [ + 'ping', '-n', '1', '-w', + str(self._DETECT_TIMEOUT*1000), address] + if platform.system().lower() == 'windows' else + [ + 'ping', '-c', '1', '-w', + str(self._DETECT_TIMEOUT), address]), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + await process.communicate() + if process.returncode == 0: + return self._main_loop.time() - start_ts + return self._DETECT_TIMEOUT + except Exception as err: # pylint: disable=broad-exception-caught + print(err) + return self._DETECT_TIMEOUT + + async def __http_async(self, url: str) -> float: + start_ts: float = self._main_loop.time() + try: + async with self._http_session.get( + url, timeout=self._DETECT_TIMEOUT) as response: + if response.status == 200: + return self._main_loop.time() - start_ts except Exception: # pylint: disable=broad-exception-caught - return False - - def __get_network_status( - self, with_retry: bool = True, timeout: int = 6 - ) -> bool: - if self._ping_address_priority >= len(self.PING_ADDRESS_LIST): - self._ping_address_priority = 0 - - if self.__ping( - self.PING_ADDRESS_LIST[self._ping_address_priority], timeout): - return True - if not with_retry: - return False - for index in range(len(self.PING_ADDRESS_LIST)): - if index == self._ping_address_priority: - continue - if self.__ping(self.PING_ADDRESS_LIST[index], timeout): - self._ping_address_priority = index - return True - return False + pass + return self._DETECT_TIMEOUT def __get_network_info(self) -> dict[str, NetworkInfo]: interfaces = psutil.net_if_addrs() @@ -246,12 +331,10 @@ class MIoTNetwork: for handler in self._sub_list_network_info.values(): self._main_loop.create_task(handler(status, info)) - async def __update_status_and_info_async(self, timeout: int = 6) -> None: + async def __update_status_and_info_async(self) -> None: try: - status: bool = await self._main_loop.run_in_executor( - None, self.__get_network_status, timeout) - infos = await self._main_loop.run_in_executor( - None, self.__get_network_info) + status: bool = await self.get_network_status_async() + infos = await self.get_network_info_async() if self._network_status != status: for handler in self._sub_list_network_status.values(): @@ -273,7 +356,7 @@ class MIoTNetwork: # Remove self.__call_network_info_change( InterfaceStatus.REMOVE, - self._network_info.pop(name, None)) + self._network_info.pop(name)) # Add for name, info in infos.items(): self._network_info[name] = info