Remove tev & fix some type issues

This commit is contained in:
Feng Wang 2024-12-21 19:09:37 +08:00
parent afef709839
commit faa13c53e8
6 changed files with 145 additions and 510 deletions

View File

@ -83,6 +83,9 @@ def randomize_int(value: int, ratio: float) -> int:
"""Randomize an integer value.""" """Randomize an integer value."""
return int(value * (1 - ratio + random.random()*2*ratio)) return int(value * (1 - ratio + random.random()*2*ratio))
def randomize_float(value: float, ratio: float) -> float:
"""Randomize a float value."""
return value * (1 - ratio + random.random()*2*ratio)
class MIoTMatcher(MQTTMatcher): class MIoTMatcher(MQTTMatcher):
"""MIoT Pub/Sub topic matcher.""" """MIoT Pub/Sub topic matcher."""

View File

@ -1,324 +0,0 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2024 Xiaomi Corporation.
The ownership and intellectual property rights of Xiaomi Home Assistant
Integration and related Xiaomi cloud service API interface provided under this
license, including source code and object code (collectively, "Licensed Work"),
are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
hereby grants you a personal, limited, non-exclusive, non-transferable,
non-sublicensable, and royalty-free license to reproduce, use, modify, and
distribute the Licensed Work only for your use of Home Assistant for
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
you to use the Licensed Work for any other purpose, including but not limited
to use Licensed Work to develop applications (APP), Web services, and other
forms of software.
You may reproduce and distribute copies of the Licensed Work, with or without
modifications, whether in source or object form, provided that you must give
any other recipients of the Licensed Work a copy of this License and retain all
copyright and disclaimers.
Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied, including, without
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
for any direct, indirect, special, incidental, or consequential damages or
losses arising from the use or inability to use the Licensed Work.
Xiaomi reserves all rights not expressly granted to you in this License.
Except for the rights expressly granted by Xiaomi under this License, Xiaomi
does not authorize you in any form to use the trademarks, copyrights, or other
forms of intellectual property rights of Xiaomi and its affiliates, including,
without limitation, without obtaining other written permission from Xiaomi, you
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
may make the public associate with Xiaomi in any form to publicize or promote
the software or hardware devices that use the Licensed Work.
Xiaomi has the right to immediately terminate all your authorization under this
License in the event:
1. You assert patent invalidation, litigation, or other claims against patents
or other intellectual property rights of Xiaomi or its affiliates; or,
2. You make, have made, manufacture, sell, or offer to sell products that knock
off Xiaomi or its affiliates' products.
MIoT event loop.
"""
import selectors
import heapq
import time
import traceback
from typing import Callable, TypeVar
import logging
import threading
# pylint: disable=relative-beyond-top-level
from .miot_error import MIoTEvError
_LOGGER = logging.getLogger(__name__)
TimeoutHandle = TypeVar('TimeoutHandle')
class MIoTFdHandler:
"""File descriptor handler."""
fd: int
read_handler: Callable[[any], None]
read_handler_ctx: any
write_handler: Callable[[any], None]
write_handler_ctx: any
def __init__(
self, fd: int,
read_handler: Callable[[any], None] = None,
read_handler_ctx: any = None,
write_handler: Callable[[any], None] = None,
write_handler_ctx: any = None
) -> None:
self.fd = fd
self.read_handler = read_handler
self.read_handler_ctx = read_handler_ctx
self.write_handler = write_handler
self.write_handler_ctx = write_handler_ctx
class MIoTTimeout:
"""Timeout handler."""
key: TimeoutHandle
target: int
handler: Callable[[any], None]
handler_ctx: any
def __init__(
self, key: str = None, target: int = None,
handler: Callable[[any], None] = None,
handler_ctx: any = None
) -> None:
self.key = key
self.target = target
self.handler = handler
self.handler_ctx = handler_ctx
def __lt__(self, other):
return self.target < other.target
class MIoTEventLoop:
"""MIoT event loop."""
_poll_fd: selectors.DefaultSelector
_fd_handlers: dict[str, MIoTFdHandler]
_timer_heap: list[MIoTTimeout]
_timer_handlers: dict[str, MIoTTimeout]
_timer_handle_seed: int
# Label if the current fd handler is freed inside a read handler to
# avoid invalid reading.
_fd_handler_freed_in_read_handler: bool
def __init__(self) -> None:
self._poll_fd = selectors.DefaultSelector()
self._timer_heap = []
self._timer_handlers = {}
self._timer_handle_seed = 1
self._fd_handlers = {}
self._fd_handler_freed_in_read_handler = False
def loop_forever(self) -> None:
"""Run an event loop in current thread."""
next_timeout: int
while True:
next_timeout = 0
# Handle timer
now_ms: int = self.__get_monotonic_ms
while len(self._timer_heap) > 0:
timer: MIoTTimeout = self._timer_heap[0]
if timer is None:
break
if timer.target <= now_ms:
heapq.heappop(self._timer_heap)
del self._timer_handlers[timer.key]
if timer.handler:
timer.handler(timer.handler_ctx)
else:
next_timeout = timer.target-now_ms
break
# Are there any files to listen to
if next_timeout == 0 and self._fd_handlers:
next_timeout = None # None == infinite
# Wait for timers & fds
if next_timeout == 0:
# Neither timer nor fds exist, exit loop
break
# Handle fd event
events = self._poll_fd.select(
timeout=next_timeout/1000.0 if next_timeout else next_timeout)
for key, mask in events:
fd_handler: MIoTFdHandler = key.data
if fd_handler is None:
continue
self._fd_handler_freed_in_read_handler = False
fd_key = str(id(fd_handler.fd))
if fd_key not in self._fd_handlers:
continue
if (
mask & selectors.EVENT_READ > 0
and fd_handler.read_handler
):
fd_handler.read_handler(fd_handler.read_handler_ctx)
if (
mask & selectors.EVENT_WRITE > 0
and self._fd_handler_freed_in_read_handler is False
and fd_handler.write_handler
):
fd_handler.write_handler(fd_handler.write_handler_ctx)
def loop_stop(self) -> None:
"""Stop the event loop."""
if self._poll_fd:
self._poll_fd.close()
self._poll_fd = None
self._fd_handlers = {}
self._timer_heap = []
self._timer_handlers = {}
def set_timeout(
self, timeout_ms: int, handler: Callable[[any], None],
handler_ctx: any = None
) -> TimeoutHandle:
"""Set a timer."""
if timeout_ms is None or handler is None:
raise MIoTEvError('invalid params')
new_timeout: MIoTTimeout = MIoTTimeout()
new_timeout.key = self.__get_next_timeout_handle
new_timeout.target = self.__get_monotonic_ms + timeout_ms
new_timeout.handler = handler
new_timeout.handler_ctx = handler_ctx
heapq.heappush(self._timer_heap, new_timeout)
self._timer_handlers[new_timeout.key] = new_timeout
return new_timeout.key
def clear_timeout(self, timer_key: TimeoutHandle) -> None:
"""Stop and remove the timer."""
if timer_key is None:
return
timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None)
if timer:
self._timer_heap = list(self._timer_heap)
self._timer_heap.remove(timer)
heapq.heapify(self._timer_heap)
def set_read_handler(
self, fd: int, handler: Callable[[any], None], handler_ctx: any = None
) -> bool:
"""Set a read handler for a file descriptor.
Returns:
bool: True, success. False, failed.
"""
self.__set_handler(
fd, is_read=True, handler=handler, handler_ctx=handler_ctx)
def set_write_handler(
self, fd: int, handler: Callable[[any], None], handler_ctx: any = None
) -> bool:
"""Set a write handler for a file descriptor.
Returns:
bool: True, success. False, failed.
"""
self.__set_handler(
fd, is_read=False, handler=handler, handler_ctx=handler_ctx)
def __set_handler(
self, fd, is_read: bool, handler: Callable[[any], None],
handler_ctx: any = None
) -> bool:
"""Set a handler."""
if fd is None:
raise MIoTEvError('invalid params')
if not self._poll_fd:
raise MIoTEvError('event loop not started')
fd_key: str = str(id(fd))
fd_handler = self._fd_handlers.get(fd_key, None)
if fd_handler is None:
fd_handler = MIoTFdHandler(fd=fd)
fd_handler.fd = fd
self._fd_handlers[fd_key] = fd_handler
read_handler_existed = fd_handler.read_handler is not None
write_handler_existed = fd_handler.write_handler is not None
if is_read is True:
fd_handler.read_handler = handler
fd_handler.read_handler_ctx = handler_ctx
else:
fd_handler.write_handler = handler
fd_handler.write_handler_ctx = handler_ctx
if fd_handler.read_handler is None and fd_handler.write_handler is None:
# Remove from epoll and map
try:
self._poll_fd.unregister(fd)
except (KeyError, ValueError, OSError) as e:
del e
self._fd_handlers.pop(fd_key, None)
# May be inside a read handler, if not, this has no effect
self._fd_handler_freed_in_read_handler = True
elif read_handler_existed is False and write_handler_existed is False:
# Add to epoll
events = 0x0
if fd_handler.read_handler:
events |= selectors.EVENT_READ
if fd_handler.write_handler:
events |= selectors.EVENT_WRITE
try:
self._poll_fd.register(fd, events=events, data=fd_handler)
except (KeyError, ValueError, OSError) as e:
_LOGGER.error(
'%s, register fd, error, %s, %s, %s, %s, %s',
threading.current_thread().name,
'read' if is_read else 'write',
fd_key, handler, e, traceback.format_exc())
self._fd_handlers.pop(fd_key, None)
return False
elif (
read_handler_existed != (fd_handler.read_handler is not None)
or write_handler_existed != (fd_handler.write_handler is not None)
):
# Modify epoll
events = 0x0
if fd_handler.read_handler:
events |= selectors.EVENT_READ
if fd_handler.write_handler:
events |= selectors.EVENT_WRITE
try:
self._poll_fd.modify(fd, events=events, data=fd_handler)
except (KeyError, ValueError, OSError) as e:
_LOGGER.error(
'%s, modify fd, error, %s, %s, %s, %s, %s',
threading.current_thread().name,
'read' if is_read else 'write',
fd_key, handler, e, traceback.format_exc())
self._fd_handlers.pop(fd_key, None)
return False
return True
@property
def __get_next_timeout_handle(self) -> str:
# Get next timeout handle, that is not larger than the maximum
# value of UINT64 type.
self._timer_handle_seed += 1
# uint64 max
self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF
return str(self._timer_handle_seed)
@property
def __get_monotonic_ms(self) -> int:
"""Get monotonic ms timestamp."""
return int(time.monotonic()*1000)

View File

@ -10,30 +10,30 @@ hereby grants you a personal, limited, non-exclusive, non-transferable,
non-sublicensable, and royalty-free license to reproduce, use, modify, and non-sublicensable, and royalty-free license to reproduce, use, modify, and
distribute the Licensed Work only for your use of Home Assistant for distribute the Licensed Work only for your use of Home Assistant for
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
you to use the Licensed Work for any other purpose, including but not limited you to use the Licensed Work for Any other purpose, including but not limited
to use Licensed Work to develop applications (APP), Web services, and other to use Licensed Work to develop applications (APP), Web services, and other
forms of software. forms of software.
You may reproduce and distribute copies of the Licensed Work, with or without You may reproduce and distribute copies of the Licensed Work, with or without
modifications, whether in source or object form, provided that you must give modifications, whether in source or object form, provided that you must give
any other recipients of the Licensed Work a copy of this License and retain all Any other recipients of the Licensed Work a copy of this License and retain all
copyright and disclaimers. copyright and disclaimers.
Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied, including, without CONDITIONS OF ANY KIND, either express or implied, including, without
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR limitation, Any warranties, undertakes, or conditions of TITLE, NO ERROR OR
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible FITNESS FOR A PARTICULAR PURPOSE. In Any event, you are solely responsible
for any direct, indirect, special, incidental, or consequential damages or for Any direct, indirect, special, incidental, or consequential damages or
losses arising from the use or inability to use the Licensed Work. losses arising from the use or inability to use the Licensed Work.
Xiaomi reserves all rights not expressly granted to you in this License. Xiaomi reserves all rights not expressly granted to you in this License.
Except for the rights expressly granted by Xiaomi under this License, Xiaomi Except for the rights expressly granted by Xiaomi under this License, Xiaomi
does not authorize you in any form to use the trademarks, copyrights, or other does not authorize you in Any form to use the trademarks, copyrights, or other
forms of intellectual property rights of Xiaomi and its affiliates, including, forms of intellectual property rights of Xiaomi and its affiliates, including,
without limitation, without obtaining other written permission from Xiaomi, you without limitation, without obtaining other written permission from Xiaomi, you
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
may make the public associate with Xiaomi in any form to publicize or promote may make the public associate with Xiaomi in Any form to publicize or promote
the software or hardware devices that use the Licensed Work. the software or hardware devices that use the Licensed Work.
Xiaomi has the right to immediately terminate all your authorization under this Xiaomi has the right to immediately terminate all your authorization under this
@ -60,7 +60,7 @@ import secrets
import socket import socket
import struct import struct
import threading import threading
from typing import Callable, Optional, final from typing import Callable, Optional, final, Coroutine, Any
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives import padding
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -68,11 +68,10 @@ from cryptography.hazmat.primitives import hashes
# pylint: disable=relative-beyond-top-level # pylint: disable=relative-beyond-top-level
from .miot_error import MIoTErrorCode from .miot_error import MIoTErrorCode
from .miot_ev import MIoTEventLoop, TimeoutHandle
from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo
from .miot_mdns import MipsService, MipsServiceState from .miot_mdns import MipsService, MipsServiceState
from .common import ( from .common import (
randomize_int, load_yaml_file, gen_absolute_path, MIoTMatcher) randomize_float, load_yaml_file, gen_absolute_path, MIoTMatcher)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -98,13 +97,13 @@ class MIoTLanCmdType(Enum):
class MIoTLanCmd: class MIoTLanCmd:
"""MIoT lan command.""" """MIoT lan command."""
type_: MIoTLanCmdType type_: MIoTLanCmdType
data: any data: Any
@dataclass @dataclass
class MIoTLanCmdData: class MIoTLanCmdData:
handler: Callable[[dict, any], None] handler: Callable[[dict, Any], None]
handler_ctx: any handler_ctx: Any
timeout_ms: int timeout_ms: int
@ -131,8 +130,8 @@ class MIoTLanUnregisterBroadcastData:
@dataclass @dataclass
class MIoTLanRegisterBroadcastData: class MIoTLanRegisterBroadcastData:
key: str key: str
handler: Callable[[dict, any], None] handler: Callable[[dict, Any], None]
handler_ctx: any handler_ctx: Any
@dataclass @dataclass
@ -143,8 +142,8 @@ class MIoTLanUnsubDeviceState:
@dataclass @dataclass
class MIoTLanSubDeviceState: class MIoTLanSubDeviceState:
key: str key: str
handler: Callable[[str, dict, any], None] handler: Callable[[str, dict, Any], None]
handler_ctx: any handler_ctx: Any
@dataclass @dataclass
@ -156,9 +155,9 @@ class MIoTLanNetworkUpdateData:
@dataclass @dataclass
class MIoTLanRequestData: class MIoTLanRequestData:
msg_id: int msg_id: int
handler: Callable[[dict, any], None] handler: Optional[Callable[[dict, Any], None]]
handler_ctx: any handler_ctx: Any
timeout: TimeoutHandle timeout: asyncio.TimerHandle
class MIoTLanDeviceState(Enum): class MIoTLanDeviceState(Enum):
@ -175,12 +174,12 @@ class MIoTLanDevice:
OT_HEADER: int = 0x2131 OT_HEADER: int = 0x2131
OT_HEADER_LEN: int = 32 OT_HEADER_LEN: int = 32
NETWORK_UNSTABLE_CNT_TH: int = 10 NETWORK_UNSTABLE_CNT_TH: int = 10
NETWORK_UNSTABLE_TIME_TH: int = 120000 NETWORK_UNSTABLE_TIME_TH: float = 120
NETWORK_UNSTABLE_RESUME_TH: int = 300000 NETWORK_UNSTABLE_RESUME_TH: float = 300
FAST_PING_INTERVAL: int = 5000 FAST_PING_INTERVAL: float = 5
CONSTRUCT_STATE_PENDING: int = 15000 CONSTRUCT_STATE_PENDING: float = 15
KA_INTERVAL_MIN = 10000 KA_INTERVAL_MIN: float = 10
KA_INTERVAL_MAX = 50000 KA_INTERVAL_MAX: float = 50
did: str did: str
token: bytes token: bytes
@ -192,19 +191,19 @@ class MIoTLanDevice:
sub_ts: int sub_ts: int
supported_wildcard_sub: bool supported_wildcard_sub: bool
_manager: any _manager: 'MIoTLan'
_if_name: Optional[str] _if_name: Optional[str]
_sub_locked: bool _sub_locked: bool
_state: MIoTLanDeviceState _state: MIoTLanDeviceState
_online: bool _online: bool
_online_offline_history: list[dict[str, any]] _online_offline_history: list[dict[str, Any]]
_online_offline_timer: Optional[TimeoutHandle] _online_offline_timer: Optional[asyncio.TimerHandle]
_ka_timer: TimeoutHandle _ka_timer: Optional[asyncio.TimerHandle]
_ka_internal: int _ka_internal: float
def __init__( def __init__(
self, manager: any, did: str, token: str, ip: Optional[str] = None self, manager: 'MIoTLan', did: str, token: str, ip: Optional[str] = None
) -> None: ) -> None:
self._manager: MIoTLan = manager self._manager: MIoTLan = manager
self.did = did self.did = did
@ -225,12 +224,12 @@ class MIoTLanDevice:
self._online_offline_history = [] self._online_offline_history = []
self._online_offline_timer = None self._online_offline_timer = None
def ka_init_handler(ctx: any) -> None: def ka_init_handler() -> None:
self._ka_internal = self.KA_INTERVAL_MIN self._ka_internal = self.KA_INTERVAL_MIN
self.__update_keep_alive(state=MIoTLanDeviceState.DEAD) self.__update_keep_alive(state=MIoTLanDeviceState.DEAD)
self._ka_timer = self._manager.mev.set_timeout( self._ka_timer = self._manager.internal_loop.call_later(
randomize_int(self.CONSTRUCT_STATE_PENDING, 0.5), randomize_float(self.CONSTRUCT_STATE_PENDING, 0.5),
ka_init_handler, None) ka_init_handler,)
_LOGGER.debug('miot lan device add, %s', self.did) _LOGGER.debug('miot lan device add, %s', self.did)
def keep_alive(self, ip: str, if_name: str) -> None: def keep_alive(self, ip: str, if_name: str) -> None:
@ -342,11 +341,9 @@ class MIoTLanDevice:
def on_delete(self) -> None: def on_delete(self) -> None:
if self._ka_timer: if self._ka_timer:
self._manager.mev.clear_timeout(self._ka_timer) self._ka_timer.cancel()
if self._online_offline_timer: if self._online_offline_timer:
self._manager.mev.clear_timeout(self._online_offline_timer) self._online_offline_timer.cancel()
self._manager = None
self.cipher = None
_LOGGER.debug('miot lan device delete, %s', self.did) _LOGGER.debug('miot lan device delete, %s', self.did)
def update_info(self, info: dict) -> None: def update_info(self, info: dict) -> None:
@ -379,7 +376,7 @@ class MIoTLanDevice:
'online': self._online, 'push_available': self.subscribed}) 'online': self._online, 'push_available': self.subscribed})
_LOGGER.info('subscribe success, %s, %s', self._if_name, self.did) _LOGGER.info('subscribe success, %s, %s', self._if_name, self.did)
def __unsubscribe_handler(self, msg: dict, ctx: any) -> None: def __unsubscribe_handler(self, msg: dict, ctx: Any) -> None:
if ( if (
'result' not in msg 'result' not in msg
or 'code' not in msg['result'] or 'code' not in msg['result']
@ -395,14 +392,14 @@ class MIoTLanDevice:
if self._state != MIoTLanDeviceState.FRESH: if self._state != MIoTLanDeviceState.FRESH:
_LOGGER.debug('device status, %s, %s', self.did, self._state) _LOGGER.debug('device status, %s, %s', self.did, self._state)
if self._ka_timer: if self._ka_timer:
self._manager.mev.clear_timeout(self._ka_timer) self._ka_timer.cancel()
self._ka_timer = None self._ka_timer = None
match state: match state:
case MIoTLanDeviceState.FRESH: case MIoTLanDeviceState.FRESH:
if last_state == MIoTLanDeviceState.DEAD: if last_state == MIoTLanDeviceState.DEAD:
self._ka_internal = self.KA_INTERVAL_MIN self._ka_internal = self.KA_INTERVAL_MIN
self.__change_online(True) self.__change_online(True)
self._ka_timer = self._manager.mev.set_timeout( self._ka_timer = self._manager.internal_loop.call_later(
self.__get_next_ka_timeout(), self.__update_keep_alive, self.__get_next_ka_timeout(), self.__update_keep_alive,
MIoTLanDeviceState.PING1) MIoTLanDeviceState.PING1)
case ( case (
@ -410,11 +407,18 @@ class MIoTLanDevice:
| MIoTLanDeviceState.PING2 | MIoTLanDeviceState.PING2
| MIoTLanDeviceState.PING3 | MIoTLanDeviceState.PING3
): ):
self._manager.ping(if_name=self._if_name, target_ip=self.ip) # Set the timer first to avoid Any early returns
# Fast ping self._ka_timer = self._manager.internal_loop.call_later(
self._ka_timer = self._manager.mev.set_timeout(
self.FAST_PING_INTERVAL, self.__update_keep_alive, self.FAST_PING_INTERVAL, self.__update_keep_alive,
MIoTLanDeviceState(state.value+1)) MIoTLanDeviceState(state.value+1))
# Fast ping
if self._if_name is None:
_LOGGER.error('if_name is Not set for device, %s', self.did)
return
if self.ip is None:
_LOGGER.error('ip is Not set for device, %s', self.did)
return
self._manager.ping(if_name=self._if_name, target_ip=self.ip)
case MIoTLanDeviceState.DEAD: case MIoTLanDeviceState.DEAD:
if last_state == MIoTLanDeviceState.PING3: if last_state == MIoTLanDeviceState.PING3:
self._ka_internal = self.KA_INTERVAL_MIN self._ka_internal = self.KA_INTERVAL_MIN
@ -422,9 +426,9 @@ class MIoTLanDevice:
case _: case _:
_LOGGER.error('invalid state, %s', state) _LOGGER.error('invalid state, %s', state)
def __get_next_ka_timeout(self) -> int: def __get_next_ka_timeout(self) -> float:
self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX) self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX)
return randomize_int(self._ka_internal, 0.1) return randomize_float(self._ka_internal, 0.1)
def __change_online(self, online: bool) -> None: def __change_online(self, online: bool) -> None:
_LOGGER.info('change online, %s, %s', self.did, online) _LOGGER.info('change online, %s, %s', self.did, online)
@ -433,7 +437,7 @@ class MIoTLanDevice:
if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH: if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH:
self._online_offline_history.pop(0) self._online_offline_history.pop(0)
if self._online_offline_timer: if self._online_offline_timer:
self._manager.mev.clear_timeout(self._online_offline_timer) self._online_offline_timer.cancel()
if not online: if not online:
self.online = False self.online = False
else: else:
@ -446,11 +450,11 @@ class MIoTLanDevice:
self.online = True self.online = True
else: else:
_LOGGER.info('unstable device detected, %s', self.did) _LOGGER.info('unstable device detected, %s', self.did)
self._online_offline_timer = self._manager.mev.set_timeout( self._online_offline_timer = self._manager.internal_loop.call_later(
self.NETWORK_UNSTABLE_RESUME_TH, self.NETWORK_UNSTABLE_RESUME_TH,
self.__online_resume_handler, None) self.__online_resume_handler)
def __online_resume_handler(self, ctx: any) -> None: def __online_resume_handler(self) -> None:
_LOGGER.info('unstable resume threshold past, %s', self.did) _LOGGER.info('unstable resume threshold past, %s', self.did)
self.online = True self.online = True
@ -470,8 +474,8 @@ class MIoTLan:
OT_MSG_LEN: int = 1400 OT_MSG_LEN: int = 1400
OT_SUPPORT_WILDCARD_SUB: int = 0xFE OT_SUPPORT_WILDCARD_SUB: int = 0xFE
OT_PROBE_INTERVAL_MIN: int = 5000 OT_PROBE_INTERVAL_MIN: float = 5
OT_PROBE_INTERVAL_MAX: int = 45000 OT_PROBE_INTERVAL_MAX: float = 45
PROFILE_MODELS_FILE: str = 'lan/profile_models.yaml' PROFILE_MODELS_FILE: str = 'lan/profile_models.yaml'
@ -486,23 +490,21 @@ class MIoTLan:
_write_buffer: bytearray _write_buffer: bytearray
_read_buffer: bytearray _read_buffer: bytearray
_mev: MIoTEventLoop _internal_loop: asyncio.AbstractEventLoop
_thread: threading.Thread _thread: threading.Thread
_queue: queue.Queue
_cmd_event_fd: os.eventfd
_available_net_ifs: set[str] _available_net_ifs: set[str]
_broadcast_socks: dict[str, socket.socket] _broadcast_socks: dict[str, socket.socket]
_local_port: Optional[int] _local_port: Optional[int]
_scan_timer: TimeoutHandle _scan_timer: Optional[asyncio.TimerHandle]
_last_scan_interval: Optional[int] _last_scan_interval: Optional[float]
_msg_id_counter: int _msg_id_counter: int
_pending_requests: dict[int, MIoTLanRequestData] _pending_requests: dict[int, MIoTLanRequestData]
_device_msg_matcher: MIoTMatcher _device_msg_matcher: MIoTMatcher
_device_state_sub_map: dict[str, MIoTLanSubDeviceState] _device_state_sub_map: dict[str, MIoTLanSubDeviceState]
_reply_msg_buffer: dict[str, TimeoutHandle] _reply_msg_buffer: dict[str, asyncio.TimerHandle]
_lan_state_sub_map: dict[str, Callable[[bool], asyncio.Future]] _lan_state_sub_map: dict[str, Callable[[bool], Coroutine]]
_lan_ctrl_vote_map: dict[str, bool] _lan_ctrl_vote_map: dict[str, bool]
_profile_models: dict[str, dict] _profile_models: dict[str, dict]
@ -532,7 +534,7 @@ class MIoTLan:
key='miot_lan', group_id='*', key='miot_lan', group_id='*',
handler=self.__on_mips_service_change) handler=self.__on_mips_service_change)
self._enable_subscribe = enable_subscribe self._enable_subscribe = enable_subscribe
self._virtual_did = virtual_did or str(secrets.randbits(64)) self._virtual_did = str(virtual_did) if (virtual_did is not None) else str(secrets.randbits(64))
# Init socket probe message # Init socket probe message
probe_bytes = bytearray(self.OT_PROBE_LEN) probe_bytes = bytearray(self.OT_PROBE_LEN)
probe_bytes[:20] = ( probe_bytes[:20] = (
@ -574,8 +576,8 @@ class MIoTLan:
return self._virtual_did return self._virtual_did
@property @property
def mev(self) -> MIoTEventLoop: def internal_loop(self) -> asyncio.AbstractEventLoop:
return self._mev return self._internal_loop
@property @property
def init_done(self) -> bool: def init_done(self) -> bool:
@ -609,12 +611,10 @@ class MIoTLan:
except Exception as err: # pylint: disable=broad-exception-caught except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('load profile models error, %s', err) _LOGGER.error('load profile models error, %s', err)
self._profile_models = {} self._profile_models = {}
self._mev = MIoTEventLoop()
self._queue = queue.Queue() self._queue = queue.Queue()
self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK) self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK)
self._mev.set_read_handler( # All tasks meant for the internal loop should happen in this thread
self._cmd_event_fd, self.__cmd_read_handler, None) self._thread = threading.Thread(target=self.__internal_loop_thread)
self._thread = threading.Thread(target=self.__lan_thread_handler)
self._thread.name = 'miot_lan' self._thread.name = 'miot_lan'
self._thread.daemon = True self._thread.daemon = True
self._thread.start() self._thread.start()
@ -623,6 +623,19 @@ class MIoTLan:
self._main_loop.create_task(handler(True)) self._main_loop.create_task(handler(True))
_LOGGER.info( _LOGGER.info(
'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs) 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs)
def __internal_loop_thread(self) -> None:
_LOGGER.info('miot lan thread start')
self._internal_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._internal_loop)
self.__init_socket()
self._internal_loop.add_reader(
self._cmd_event_fd, self.__cmd_read_handler)
self._scan_timer = self._internal_loop.call_later(
int(3*random.random()), self.__scan_devices)
self._internal_loop.run_forever()
self._internal_loop.close()
_LOGGER.info('miot lan thread exit')
async def deinit_async(self) -> None: async def deinit_async(self) -> None:
if not self._init_done: if not self._init_done:
@ -687,7 +700,7 @@ class MIoTLan:
if not self._init_done: if not self._init_done:
self._enable_subscribe = enable_subscribe self._enable_subscribe = enable_subscribe
return return
return self.__lan_send_cmd( self.__lan_send_cmd(
cmd=MIoTLanCmdType.OPTIONS_UPDATE, cmd=MIoTLanCmdType.OPTIONS_UPDATE,
data={ data={
'enable_subscribe': enable_subscribe, }) 'enable_subscribe': enable_subscribe, })
@ -705,7 +718,7 @@ class MIoTLan:
data=devices) data=devices)
def sub_lan_state( def sub_lan_state(
self, key: str, handler: Callable[[bool], asyncio.Future] self, key: str, handler: Callable[[bool], Coroutine]
) -> None: ) -> None:
self._lan_state_sub_map[key] = handler self._lan_state_sub_map[key] = handler
@ -714,8 +727,8 @@ class MIoTLan:
@final @final
def sub_device_state( def sub_device_state(
self, key: str, handler: Callable[[str, dict, any], None], self, key: str, handler: Callable[[str, dict, Any], None],
handler_ctx: any = None handler_ctx: Any = None
) -> bool: ) -> bool:
return self.__lan_send_cmd( return self.__lan_send_cmd(
cmd=MIoTLanCmdType.SUB_DEVICE_STATE, cmd=MIoTLanCmdType.SUB_DEVICE_STATE,
@ -730,8 +743,8 @@ class MIoTLan:
@final @final
def sub_prop( def sub_prop(
self, did: str, handler: Callable[[dict, any], None], self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, piid: int = None, handler_ctx: any = None siid: Optional[int] = None, piid: Optional[int] = None, handler_ctx: Any = None
) -> bool: ) -> bool:
if not self._enable_subscribe: if not self._enable_subscribe:
return False return False
@ -744,7 +757,7 @@ class MIoTLan:
key=key, handler=handler, handler_ctx=handler_ctx)) key=key, handler=handler, handler_ctx=handler_ctx))
@final @final
def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool: def unsub_prop(self, did: str, siid: Optional[int] = None, piid: Optional[int] = None) -> bool:
if not self._enable_subscribe: if not self._enable_subscribe:
return False return False
key = ( key = (
@ -756,8 +769,8 @@ class MIoTLan:
@final @final
def sub_event( def sub_event(
self, did: str, handler: Callable[[dict, any], None], self, did: str, handler: Callable[[dict, Any], None],
siid: int = None, eiid: int = None, handler_ctx: any = None siid: Optional[int] = None, eiid: Optional[int] = None, handler_ctx: Any = None
) -> bool: ) -> bool:
if not self._enable_subscribe: if not self._enable_subscribe:
return False return False
@ -770,7 +783,7 @@ class MIoTLan:
key=key, handler=handler, handler_ctx=handler_ctx)) key=key, handler=handler, handler_ctx=handler_ctx))
@final @final
def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool: def unsub_event(self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None) -> bool:
if not self._enable_subscribe: if not self._enable_subscribe:
return False return False
key = ( key = (
@ -783,7 +796,7 @@ class MIoTLan:
@final @final
async def get_prop_async( async def get_prop_async(
self, did: str, siid: int, piid: int, timeout_ms: int = 10000 self, did: str, siid: int, piid: int, timeout_ms: int = 10000
) -> any: ) -> Any:
result_obj = await self.__call_api_async( result_obj = await self.__call_api_async(
did=did, msg={ did=did, msg={
'method': 'get_properties', 'method': 'get_properties',
@ -801,7 +814,7 @@ class MIoTLan:
@final @final
async def set_prop_async( async def set_prop_async(
self, did: str, siid: int, piid: int, value: any, self, did: str, siid: int, piid: int, value: Any,
timeout_ms: int = 10000 timeout_ms: int = 10000
) -> dict: ) -> dict:
result_obj = await self.__call_api_async( result_obj = await self.__call_api_async(
@ -868,7 +881,7 @@ class MIoTLan:
fut.set_result({}) fut.set_result({})
return await fut return await fut
def ping(self, if_name: str, target_ip: str) -> None: def ping(self, if_name: str | None, target_ip: str) -> None:
if not target_ip: if not target_ip:
return return
self.__sendto( self.__sendto(
@ -878,13 +891,13 @@ class MIoTLan:
def send2device( def send2device(
self, did: str, self, did: str,
msg: dict, msg: dict,
handler: Optional[Callable[[dict, any], None]] = None, handler: Optional[Callable[[dict, Any], None]] = None,
handler_ctx: any = None, handler_ctx: Any = None,
timeout_ms: Optional[int] = None timeout_ms: Optional[int] = None
) -> None: ) -> None:
if timeout_ms and not handler: if timeout_ms and not handler:
raise ValueError('handler is required when timeout_ms is set') raise ValueError('handler is required when timeout_ms is set')
device: MIoTLanDevice = self._lan_devices.get(did) device: MIoTLanDevice | None = self._lan_devices.get(did)
if not device: if not device:
raise ValueError('invalid device') raise ValueError('invalid device')
if not device.cipher: if not device.cipher:
@ -900,7 +913,7 @@ class MIoTLan:
did=did, did=did,
offset=int(time.time())-device.offset) offset=int(time.time())-device.offset)
return self.make_request( return self.__make_request(
msg_id=in_msg['id'], msg_id=in_msg['id'],
msg=self._write_buffer[0: msg_len], msg=self._write_buffer[0: msg_len],
if_name=device.if_name, if_name=device.if_name,
@ -909,33 +922,33 @@ class MIoTLan:
handler_ctx=handler_ctx, handler_ctx=handler_ctx,
timeout_ms=timeout_ms) timeout_ms=timeout_ms)
def make_request( def __make_request(
self, self,
msg_id: int, msg_id: int,
msg: bytearray, msg: bytearray,
if_name: str, if_name: str,
ip: str, ip: str,
handler: Callable[[dict, any], None], handler: Optional[Callable[[dict, Any], None]],
handler_ctx: any = None, handler_ctx: Any = None,
timeout_ms: Optional[int] = None timeout_ms: Optional[int] = None
) -> None: ) -> None:
def request_timeout_handler(req_data: MIoTLanRequestData): def request_timeout_handler(req_data: MIoTLanRequestData):
self._pending_requests.pop(req_data.msg_id, None) self._pending_requests.pop(req_data.msg_id, None)
if req_data: if req_data and req_data.handler:
req_data.handler({ req_data.handler({
'code': MIoTErrorCode.CODE_TIMEOUT.value, 'code': MIoTErrorCode.CODE_TIMEOUT.value,
'error': 'timeout'}, 'error': 'timeout'},
req_data.handler_ctx) req_data.handler_ctx)
timer: Optional[TimeoutHandle] = None timer: Optional[asyncio.TimerHandle] = None
request_data = MIoTLanRequestData( request_data = MIoTLanRequestData(
msg_id=msg_id, msg_id=msg_id,
handler=handler, handler=handler,
handler_ctx=handler_ctx, handler_ctx=handler_ctx,
timeout=timer) timeout=timer)
if timeout_ms: if timeout_ms:
timer = self._mev.set_timeout( timer = self._internal_loop.call_later(
timeout_ms, request_timeout_handler, request_data) timeout_ms/1000, request_timeout_handler, request_data)
request_data.timeout = timer request_data.timeout = timer
self._pending_requests[msg_id] = request_data self._pending_requests[msg_id] = request_data
self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT) self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT)
@ -954,7 +967,7 @@ class MIoTLan:
self._msg_id_counter = 1 self._msg_id_counter = 1
return self._msg_id_counter return self._msg_id_counter
def __lan_send_cmd(self, cmd: MIoTLanCmd, data: any) -> bool: def __lan_send_cmd(self, cmd: MIoTLanCmdType, data: Any) -> bool:
try: try:
self._queue.put(MIoTLanCmd(type_=cmd, data=data)) self._queue.put(MIoTLanCmd(type_=cmd, data=data))
os.eventfd_write(self._cmd_event_fd, 1) os.eventfd_write(self._cmd_event_fd, 1)
@ -986,16 +999,7 @@ class MIoTLan:
'error': 'send cmd error'}) 'error': 'send cmd error'})
return await fut return await fut
def __lan_thread_handler(self) -> None: def __cmd_read_handler(self) -> None:
_LOGGER.info('miot lan thread start')
self.__init_socket()
# Create scan devices timer
self._scan_timer = self._mev.set_timeout(
int(3000*random.random()), self.__scan_devices, None)
self._mev.loop_forever()
_LOGGER.info('miot lan thread exit')
def __cmd_read_handler(self, ctx: any) -> None:
fd_value = os.eventfd_read(self._cmd_event_fd) fd_value = os.eventfd_read(self._cmd_event_fd)
if fd_value == 0: if fd_value == 0:
return return
@ -1109,20 +1113,21 @@ class MIoTLan:
elif mips_cmd.type_ == MIoTLanCmdType.DEINIT: elif mips_cmd.type_ == MIoTLanCmdType.DEINIT:
# stop the thread # stop the thread
if self._scan_timer: if self._scan_timer:
self._mev.clear_timeout(self._scan_timer) self._scan_timer.cancel()
self._scan_timer = None self._scan_timer = None
for device in self._lan_devices.values(): for device in self._lan_devices.values():
device.on_delete() device.on_delete()
self._lan_devices.clear() self._lan_devices.clear()
for req_data in self._pending_requests.values(): for req_data in self._pending_requests.values():
self._mev.clear_timeout(req_data.timeout) req_data.timeout.cancel()
self._pending_requests.clear() self._pending_requests.clear()
for timer in self._reply_msg_buffer.values(): for timer in self._reply_msg_buffer.values():
self._mev.clear_timeout(timer) timer.cancel()
self._reply_msg_buffer.clear() self._reply_msg_buffer.clear()
self._device_msg_matcher = MIoTMatcher() self._device_msg_matcher = MIoTMatcher()
self.__deinit_socket() self.__deinit_socket()
self._mev.loop_stop() # DO NOT force a event loop to stop.
# It will stop when you release all handles properly.
def __init_socket(self) -> None: def __init_socket(self) -> None:
self.__deinit_socket() self.__deinit_socket()
@ -1145,7 +1150,7 @@ class MIoTLan:
sock.setsockopt( sock.setsockopt(
socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode()) socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode())
sock.bind(('', self._local_port or 0)) sock.bind(('', self._local_port or 0))
self._mev.set_read_handler( self._internal_loop.add_reader(
sock.fileno(), self.__socket_read_handler, (if_name, sock)) sock.fileno(), self.__socket_read_handler, (if_name, sock))
self._broadcast_socks[if_name] = sock self._broadcast_socks[if_name] = sock
self._local_port = self._local_port or sock.getsockname()[1] self._local_port = self._local_port or sock.getsockname()[1]
@ -1163,7 +1168,7 @@ class MIoTLan:
sock = self._broadcast_socks.pop(if_name, None) sock = self._broadcast_socks.pop(if_name, None)
if not sock: if not sock:
return return
self._mev.set_read_handler(sock.fileno(), None, None) self._internal_loop.remove_reader(sock.fileno())
sock.close() sock.close()
_LOGGER.info('destroyed socket, %s', if_name) _LOGGER.info('destroyed socket, %s', if_name)
@ -1190,7 +1195,7 @@ class MIoTLan:
return return
# Keep alive message # Keep alive message
did: str = str(struct.unpack('>Q', data[4:12])[0]) did: str = str(struct.unpack('>Q', data[4:12])[0])
device: MIoTLanDevice = self._lan_devices.get(did) device: MIoTLanDevice | None = self._lan_devices.get(did)
if not device: if not device:
return return
timestamp: int = struct.unpack('>I', data[12:16])[0] timestamp: int = struct.unpack('>I', data[12:16])[0]
@ -1230,11 +1235,12 @@ class MIoTLan:
_LOGGER.warning('invalid message, no id, %s, %s', did, msg) _LOGGER.warning('invalid message, no id, %s, %s', did, msg)
return return
# Reply # Reply
req: MIoTLanRequestData = self._pending_requests.pop(msg['id'], None) req: MIoTLanRequestData | None = self._pending_requests.pop(msg['id'], None)
if req: if req:
self._mev.clear_timeout(req.timeout) req.timeout.cancel()
self._main_loop.call_soon_threadsafe( if req.handler is not None:
req.handler, msg, req.handler_ctx) self._main_loop.call_soon_threadsafe(
req.handler, msg, req.handler_ctx)
return return
# Handle up link message # Handle up link message
if 'method' not in msg or 'params' not in msg: if 'method' not in msg or 'params' not in msg:
@ -1281,15 +1287,16 @@ class MIoTLan:
filter_id = f'{did}.{msg_id}' filter_id = f'{did}.{msg_id}'
if filter_id in self._reply_msg_buffer: if filter_id in self._reply_msg_buffer:
return True return True
self._reply_msg_buffer[filter_id] = self._mev.set_timeout( self._reply_msg_buffer[filter_id] = self._internal_loop.call_later(
5000, 5,
lambda filter_id: self._reply_msg_buffer.pop(filter_id, None), lambda filter_id: self._reply_msg_buffer.pop(filter_id, None),
filter_id) filter_id)
return False
def __sendto( def __sendto(
self, if_name: str, data: bytes, address: str, port: int self, if_name: str | None, data: bytes, address: str, port: int
) -> None: ) -> None:
if address == '255.255.255.255': if if_name is None:
# Broadcast # Broadcast
for if_n, sock in self._broadcast_socks.items(): for if_n, sock in self._broadcast_socks.items():
_LOGGER.debug('send broadcast, %s', if_n) _LOGGER.debug('send broadcast, %s', if_n)
@ -1302,17 +1309,21 @@ class MIoTLan:
return return
sock.sendto(data, socket.MSG_DONTWAIT, (address, port)) sock.sendto(data, socket.MSG_DONTWAIT, (address, port))
def __scan_devices(self, ctx: any) -> None: def __scan_devices(self) -> None:
if self._scan_timer: if self._scan_timer:
self._mev.clear_timeout(self._scan_timer) self._scan_timer.cancel()
# Scan devices # Ignore any exceptions to avoid blocking the loop
self.ping(if_name=None, target_ip='255.255.255.255') try:
# Scan devices
self.ping(if_name=None, target_ip='255.255.255.255')
except:
pass
scan_time = self.__get_next_scan_time() scan_time = self.__get_next_scan_time()
self._scan_timer = self._mev.set_timeout( self._scan_timer = self._internal_loop.call_later(
scan_time, self.__scan_devices, None) scan_time, self.__scan_devices)
_LOGGER.debug('next scan time: %sms', scan_time) _LOGGER.debug('next scan time: %sms', scan_time)
def __get_next_scan_time(self) -> int: def __get_next_scan_time(self) -> float:
if not self._last_scan_interval: if not self._last_scan_interval:
self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN
self._last_scan_interval = min( self._last_scan_interval = min(

View File

@ -50,7 +50,7 @@ import base64
import binascii import binascii
import copy import copy
from enum import Enum from enum import Enum
from typing import Callable, Optional from typing import Callable, Optional, Coroutine
import logging import logging
from zeroconf import ( from zeroconf import (
@ -151,7 +151,7 @@ class MipsService:
_services: dict[str, dict] _services: dict[str, dict]
# key = (key, group_id) # key = (key, group_id)
_sub_list: dict[(str, str), Callable[[ _sub_list: dict[(str, str), Callable[[
str, MipsServiceState, dict], asyncio.Future]] str, MipsServiceState, dict], Coroutine]]
def __init__( def __init__(
self, aiozc: AsyncZeroconf, self, aiozc: AsyncZeroconf,
@ -207,7 +207,7 @@ class MipsService:
def sub_service_change( def sub_service_change(
self, key: str, group_id: str, self, key: str, group_id: str,
handler: Callable[[str, MipsServiceState, dict], asyncio.Future] handler: Callable[[str, MipsServiceState, dict], Coroutine]
) -> None: ) -> None:
if key is None or group_id is None or handler is None: if key is None or group_id is None or handler is None:
raise MipsServiceError('invalid params') raise MipsServiceError('invalid params')

View File

@ -52,7 +52,7 @@ import socket
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
import subprocess import subprocess
from typing import Callable, Optional from typing import Callable, Optional, Coroutine
import psutil import psutil
import ipaddress import ipaddress
@ -97,7 +97,7 @@ class MIoTNetwork:
_sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]] _sub_list_network_status: dict[str, Callable[[bool], asyncio.Future]]
_sub_list_network_info: dict[str, Callable[[ _sub_list_network_info: dict[str, Callable[[
InterfaceStatus, NetworkInfo], asyncio.Future]] InterfaceStatus, NetworkInfo], Coroutine]]
_ping_address_priority: int _ping_address_priority: int
@ -155,7 +155,7 @@ class MIoTNetwork:
def sub_network_info( def sub_network_info(
self, key: str, self, key: str,
handler: Callable[[InterfaceStatus, NetworkInfo], asyncio.Future] handler: Callable[[InterfaceStatus, NetworkInfo], Coroutine]
) -> None: ) -> None:
self._sub_list_network_info[key] = handler self._sub_list_network_info[key] = handler

View File

@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
"""Unit test for miot_ev.py."""
import os
import pytest
# pylint: disable=import-outside-toplevel, disable=unused-argument
@pytest.mark.github
def test_mev_timer_and_fd():
from miot.miot_ev import MIoTEventLoop, TimeoutHandle
mev = MIoTEventLoop()
assert mev
event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK)
assert event_fd
timer4: TimeoutHandle = None
def event_handler(event_fd):
value: int = os.eventfd_read(event_fd)
if value == 1:
mev.clear_timeout(timer4)
print('cancel timer4')
elif value == 2:
print('event write twice in a row')
elif value == 3:
mev.set_read_handler(event_fd, None, None)
os.close(event_fd)
event_fd = None
print('close event fd')
def timer1_handler(event_fd):
os.eventfd_write(event_fd, 1)
def timer2_handler(event_fd):
os.eventfd_write(event_fd, 1)
os.eventfd_write(event_fd, 1)
def timer3_handler(event_fd):
os.eventfd_write(event_fd, 3)
def timer4_handler(event_fd):
raise ValueError('unreachable code')
mev.set_read_handler(
event_fd, event_handler, event_fd)
mev.set_timeout(500, timer1_handler, event_fd)
mev.set_timeout(1000, timer2_handler, event_fd)
mev.set_timeout(1500, timer3_handler, event_fd)
timer4 = mev.set_timeout(2000, timer4_handler, event_fd)
mev.loop_forever()
# Loop will exit when there are no timers or fd handlers.
mev.loop_stop()