refator: refactor miot network

This commit is contained in:
sworld 2024-12-29 01:23:51 +08:00
parent 7606ae5912
commit 405c435841

View File

@ -52,7 +52,8 @@ import socket
from dataclasses import dataclass
from enum import Enum, auto
import subprocess
from typing import Callable, Optional
from typing import Callable, Coroutine, Optional
import aiohttp
import psutil
import ipaddress
@ -77,38 +78,55 @@ class NetworkInfo:
class MIoTNetwork:
"""MIoT network utilities."""
PING_ADDRESS_LIST = [
_IP_ADDRESS_LIST: list[str] = [
'1.2.4.8', # CNNIC sDNS
'8.8.8.8', # Google Public DNS
'233.5.5.5', # AliDNS
'1.1.1.1', # Cloudflare DNS
'114.114.114.114', # 114 DNS
'208.67.222.222', # OpenDNS
'9.9.9.9', # Quad9 DNS
'9.9.9.9' # Quad9
]
_HTTP_ADDRESS_LIST: list[str] = [
'https://www.bing.com',
'https://www.google.com',
'https://www.baidu.com'
]
_REFRESH_INTERVAL = 30
_DETECT_TIMEOUT = 6
_main_loop: asyncio.AbstractEventLoop
_ip_addr_map: dict[str, float]
_http_addr_list: dict[str, float]
_http_session: aiohttp.ClientSession
_refresh_interval: int
_refresh_task: asyncio.Task
_refresh_timer: asyncio.TimerHandle
_refresh_task: Optional[asyncio.Task]
_refresh_timer: Optional[asyncio.TimerHandle]
_network_status: bool
_network_info: dict[str, NetworkInfo]
_sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]]
_sub_list_network_status: dict[str, Callable[[bool], Coroutine]]
_sub_list_network_info: dict[str, Callable[[
InterfaceStatus, NetworkInfo], asyncio.Future]]
_ping_address_priority: int
InterfaceStatus, NetworkInfo], Coroutine]]
_done_event: asyncio.Event
def __init__(
self, loop: Optional[asyncio.AbstractEventLoop] = None
self,
ip_addr_list: Optional[list[str]] = None,
http_addr_list: Optional[list[str]] = None,
refresh_interval: Optional[int] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
self._main_loop = loop or asyncio.get_running_loop()
self._ip_addr_map = {
ip: self._DETECT_TIMEOUT for ip in
ip_addr_list or self._IP_ADDRESS_LIST}
self._http_addr_map = {
http: self._DETECT_TIMEOUT for http in
http_addr_list or self._HTTP_ADDRESS_LIST}
self._http_session = aiohttp.ClientSession()
self._refresh_interval = refresh_interval or self._REFRESH_INTERVAL
self._refresh_interval = None
self._refresh_task = None
self._refresh_timer = None
@ -122,13 +140,10 @@ class MIoTNetwork:
self._done_event = asyncio.Event()
@property
def network_status(self) -> bool:
return self._network_status
@property
def network_info(self) -> dict[str, NetworkInfo]:
return self._network_info
async def init_async(self) -> bool:
self.__refresh_timer_handler()
# MUST get network info before starting
return await self._done_event.wait()
async def deinit_async(self) -> None:
if self._refresh_task:
@ -137,16 +152,36 @@ class MIoTNetwork:
if self._refresh_timer:
self._refresh_timer.cancel()
self._refresh_timer = None
await self._http_session.close()
self._refresh_interval = None
self._network_status = False
self._network_info.clear()
self._sub_list_network_status.clear()
self._sub_list_network_info.clear()
self._done_event.clear()
@property
def network_status(self) -> bool:
return self._network_status
@property
def network_info(self) -> dict[str, NetworkInfo]:
return self._network_info
async def update_addr_list_async(
self,
ip_addr_list: Optional[list[str]] = None,
http_addr_list: Optional[list[str]] = None,
) -> None:
if ip_addr_list:
self._ip_addr_map = {
ip: self._DETECT_TIMEOUT for ip in ip_addr_list}
if http_addr_list:
self._http_addr_map = {
http: self._DETECT_TIMEOUT for http in http_addr_list}
def sub_network_status(
self, key: str, handler: Callable[[bool], asyncio.Future]
self, key: str, handler: Callable[[bool], Coroutine]
) -> None:
self._sub_list_network_status[key] = handler
@ -155,65 +190,115 @@ class MIoTNetwork:
def sub_network_info(
self, key: str,
handler: Callable[[InterfaceStatus, NetworkInfo], asyncio.Future]
handler: Callable[[InterfaceStatus, NetworkInfo], Coroutine]
) -> None:
self._sub_list_network_info[key] = handler
def unsub_network_info(self, key: str) -> None:
self._sub_list_network_info.pop(key, None)
async def init_async(self, refresh_interval: int = 30) -> bool:
self._refresh_interval = refresh_interval
self.__refresh_timer_handler()
# MUST get network info before starting
return await self._done_event.wait()
async def refresh_async(self) -> None:
self.__refresh_timer_handler()
async def get_network_status_async(self, timeout: int = 6) -> bool:
return await self._main_loop.run_in_executor(
None, self.__get_network_status, False, timeout)
async def get_network_status_async(self) -> bool:
try:
ip_addr: str = ''
ip_ts: float = self._DETECT_TIMEOUT
for ip, ts in self._ip_addr_map.items():
if ts < ip_ts:
ip_addr = ip
ip_ts = ts
if (
ip_ts < self._DETECT_TIMEOUT
and self.ping_multi_async(ip_list=[ip_addr])
):
return True
http_addr: str = ''
http_ts: float = self._DETECT_TIMEOUT
for http, ts in self._http_addr_map.items():
if ts < http_ts:
http_addr = http
http_ts = ts
if (
http_ts < self._DETECT_TIMEOUT
and await self.http_multi_async(url_list=[http_addr])
):
return True
# Detect all addresses
results = await asyncio.gather(
*[self.ping_multi_async(), self.http_multi_async()])
return any(results)
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('get network status error, %s', err)
return False
async def get_network_info_async(self) -> dict[str, NetworkInfo]:
return await self._main_loop.run_in_executor(
None, self.__get_network_info)
async def ping_multi_async(
self, ip_list: Optional[list[str]] = None
) -> bool:
addr_list = ip_list or list(self._ip_addr_map.keys())
tasks = []
for addr in addr_list:
tasks.append(self.__ping_async(addr))
results = await asyncio.gather(*tasks)
for addr, ts in zip(addr_list, results):
if addr in self._ip_addr_map:
self._ip_addr_map[addr] = ts
return any([ts < self._DETECT_TIMEOUT for ts in results])
async def http_multi_async(
self, url_list: Optional[list[str]] = None
) -> bool:
addr_list = url_list or list(self._http_addr_map.keys())
tasks = []
for addr in addr_list:
tasks.append(self.__http_async(url=addr))
results = await asyncio.gather(*tasks)
for addr, ts in zip(addr_list, results):
if addr in self._http_addr_map:
self._http_addr_map[addr] = ts
return any([ts < self._DETECT_TIMEOUT for ts in results])
def __calc_network_address(self, ip: str, netmask: str) -> str:
return str(ipaddress.IPv4Network(
f'{ip}/{netmask}', strict=False).network_address)
def __ping(
self, address: Optional[str] = None, timeout: int = 6
) -> bool:
param = '-n' if platform.system().lower() == 'windows' else '-c'
command = ['ping', param, '1', address]
async def __ping_async(self, address: Optional[str] = None) -> float:
start_ts: float = self._main_loop.time()
try:
output = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=True, timeout=timeout)
return output.returncode == 0
process = await asyncio.create_subprocess_exec(
*(
[
'ping', '-n', '1', '-w',
str(self._DETECT_TIMEOUT*1000), address]
if platform.system().lower() == 'windows' else
[
'ping', '-c', '1', '-w',
str(self._DETECT_TIMEOUT), address]),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
await process.communicate()
if process.returncode == 0:
return self._main_loop.time() - start_ts
return self._DETECT_TIMEOUT
except Exception as err: # pylint: disable=broad-exception-caught
print(err)
return self._DETECT_TIMEOUT
async def __http_async(self, url: str) -> float:
start_ts: float = self._main_loop.time()
try:
async with self._http_session.get(
url, timeout=self._DETECT_TIMEOUT) as response:
if response.status == 200:
return self._main_loop.time() - start_ts
except Exception: # pylint: disable=broad-exception-caught
return False
def __get_network_status(
self, with_retry: bool = True, timeout: int = 6
) -> bool:
if self._ping_address_priority >= len(self.PING_ADDRESS_LIST):
self._ping_address_priority = 0
if self.__ping(
self.PING_ADDRESS_LIST[self._ping_address_priority], timeout):
return True
if not with_retry:
return False
for index in range(len(self.PING_ADDRESS_LIST)):
if index == self._ping_address_priority:
continue
if self.__ping(self.PING_ADDRESS_LIST[index], timeout):
self._ping_address_priority = index
return True
return False
pass
return self._DETECT_TIMEOUT
def __get_network_info(self) -> dict[str, NetworkInfo]:
interfaces = psutil.net_if_addrs()
@ -246,12 +331,10 @@ class MIoTNetwork:
for handler in self._sub_list_network_info.values():
self._main_loop.create_task(handler(status, info))
async def __update_status_and_info_async(self, timeout: int = 6) -> None:
async def __update_status_and_info_async(self) -> None:
try:
status: bool = await self._main_loop.run_in_executor(
None, self.__get_network_status, timeout)
infos = await self._main_loop.run_in_executor(
None, self.__get_network_info)
status: bool = await self.get_network_status_async()
infos = await self.get_network_info_async()
if self._network_status != status:
for handler in self._sub_list_network_status.values():
@ -273,7 +356,7 @@ class MIoTNetwork:
# Remove
self.__call_network_info_change(
InterfaceStatus.REMOVE,
self._network_info.pop(name, None))
self._network_info.pop(name))
# Add
for name, info in infos.items():
self._network_info[name] = info