Compare commits

...

14 Commits

Author SHA1 Message Date
Li Shuzhen
c60b7099bd
Merge 38296132a6 into 9ceca34b28 2025-01-10 22:34:38 +08:00
Feng Wang
9ceca34b28
refactor: refactor miot mips & fix type errors (#365)
Some checks failed
Tests / check-rule-format (push) Has been cancelled
Validate / validate-hassfest (push) Has been cancelled
Validate / validate-hacs (push) Has been cancelled
Validate / validate-lint (push) Has been cancelled
Validate / validate-setup (push) Has been cancelled
* remove use of tev & fix type errors

* lint fix

* make private classes private

* simplify inheritance

* fix thread naming

* fix the deleted public data class

* remove tev

* fix access violation

* style: format code

* style: param init

* fix: fix event async set

* fix: fix mips re-connect error

---------

Co-authored-by: topsworld <sworldtop@gmail.com>
2025-01-10 21:46:00 +08:00
Paul Shawn
152933a223
docs: update changelog and version to v0.1.5b1 (#616)
Some checks are pending
Tests / check-rule-format (push) Waiting to run
Validate / validate-hassfest (push) Waiting to run
Validate / validate-hacs (push) Waiting to run
Validate / validate-lint (push) Waiting to run
Validate / validate-setup (push) Waiting to run
2025-01-10 09:46:34 +08:00
LiShuzhen
38296132a6 feat: yeelink.light.ceiling19 ambient light 2025-01-03 12:57:20 +08:00
LiShuzhen
11ab0290f1 Merge branch 'main' into feat-spec-custom-modify 2025-01-03 11:46:53 +08:00
LiShuzhen
e705ba56da feat: zimi.waterheater.h03 standalone switch 2024-12-26 08:57:08 +08:00
LiShuzhen
ee05222abc fix: pylint warn 2024-12-25 21:15:25 +08:00
LiShuzhen
51ff17f1bf chore: urn key format 2024-12-25 21:13:23 +08:00
LiShuzhen
d5373e55b4 fix: rules take effect 2024-12-25 20:51:43 +08:00
LiShuzhen
5dd0047094 fix: pylint warn 2024-12-25 17:17:33 +08:00
LiShuzhen
beb35fe2a3 chore: custom_service.json format check 2024-12-25 17:04:42 +08:00
LiShuzhen
847d90c0ca Merge branch 'main' into feat-spec-custom-modify 2024-12-25 14:24:31 +08:00
LiShuzhen
cc883d78e5 Merge branch 'main' into feat-spec-custom-modify 2024-12-20 16:41:46 +08:00
LiShuzhen
58923f31ef feat: custom service spec 2024-12-20 16:40:38 +08:00
16 changed files with 913 additions and 1093 deletions

View File

@ -1,5 +1,16 @@
# CHANGELOG
## v0.1.5b1
This version will cause some Xiaomi routers that do not support access (#564) to become unavailable. You can update the device list in the configuration or delete it manually.
### Added
- Fan entity support direction ctrl [#556](https://github.com/XiaoMi/ha_xiaomi_home/pull/556)
### Changed
- Filter miwifi.* devices and xiaomi.router.rd03 [#564](https://github.com/XiaoMi/ha_xiaomi_home/pull/564)
### Fixed
- Fix multi ha instance login [#560](https://github.com/XiaoMi/ha_xiaomi_home/pull/560)
- Fix fan speed [#464](https://github.com/XiaoMi/ha_xiaomi_home/pull/464)
- The number of profile models updated from 660 to 823. [#583](https://github.com/XiaoMi/ha_xiaomi_home/pull/583)
## v0.1.5b0
### Added
- Add missing parameter state_class [#101](https://github.com/XiaoMi/ha_xiaomi_home/pull/101)

View File

@ -155,7 +155,8 @@ async def async_setup_entry(
for entity in filter_entities:
device.entity_list[platform].remove(entity)
entity_id = device.gen_service_entity_id(
ha_domain=platform, siid=entity.spec.iid)
ha_domain=platform, siid=entity.spec.iid,
description=entity.spec.description)
if er.async_get(entity_id_or_uuid=entity_id):
er.async_remove(entity_id=entity_id)
if platform in device.prop_list:

View File

@ -25,7 +25,7 @@
"cryptography",
"psutil"
],
"version": "v0.1.5b0",
"version": "v0.1.5b1",
"zeroconf": [
"_miot-central._tcp.local."
]

View File

@ -357,7 +357,7 @@ class MIoTClient:
# Cloud mips
self._mips_cloud.unsub_mips_state(
key=f'{self._uid}-{self._cloud_server}')
self._mips_cloud.disconnect()
self._mips_cloud.deinit()
# Cancel refresh cloud devices
if self._refresh_cloud_devices_timer:
self._refresh_cloud_devices_timer.cancel()
@ -370,7 +370,7 @@ class MIoTClient:
for mips in self._mips_local.values():
mips.on_dev_list_changed = None
mips.unsub_mips_state(key=mips.group_id)
mips.disconnect()
mips.deinit()
if self._mips_local_state_changed_timers:
for timer_item in (
self._mips_local_state_changed_timers.values()):

View File

@ -298,10 +298,11 @@ class MIoTDevice:
f'{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_'
f'{self._model_strs[-1][:20]}')
def gen_service_entity_id(self, ha_domain: str, siid: int) -> str:
def gen_service_entity_id(self, ha_domain: str, siid: int,
description: str) -> str:
return (
f'{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_'
f'{self._model_strs[-1][:20]}_s_{siid}')
f'{self._model_strs[-1][:20]}_s_{siid}_{description}')
def gen_prop_entity_id(
self, ha_domain: str, spec_name: str, siid: int, piid: int
@ -744,7 +745,8 @@ class MIoTServiceEntity(Entity):
self._attr_name = f' {self.entity_data.spec.description_trans}'
elif isinstance(entity_data.spec, MIoTSpecService):
self.entity_id = miot_device.gen_service_entity_id(
DOMAIN, siid=entity_data.spec.iid)
DOMAIN, siid=entity_data.spec.iid,
description=entity_data.spec.description)
self._attr_name = (
f'{"* "if self.entity_data.spec.proprietary else " "}'
f'{self.entity_data.spec.description_trans}')

View File

@ -1,324 +0,0 @@
# -*- coding: utf-8 -*-
"""
Copyright (C) 2024 Xiaomi Corporation.
The ownership and intellectual property rights of Xiaomi Home Assistant
Integration and related Xiaomi cloud service API interface provided under this
license, including source code and object code (collectively, "Licensed Work"),
are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
hereby grants you a personal, limited, non-exclusive, non-transferable,
non-sublicensable, and royalty-free license to reproduce, use, modify, and
distribute the Licensed Work only for your use of Home Assistant for
non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
you to use the Licensed Work for any other purpose, including but not limited
to use Licensed Work to develop applications (APP), Web services, and other
forms of software.
You may reproduce and distribute copies of the Licensed Work, with or without
modifications, whether in source or object form, provided that you must give
any other recipients of the Licensed Work a copy of this License and retain all
copyright and disclaimers.
Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied, including, without
limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
for any direct, indirect, special, incidental, or consequential damages or
losses arising from the use or inability to use the Licensed Work.
Xiaomi reserves all rights not expressly granted to you in this License.
Except for the rights expressly granted by Xiaomi under this License, Xiaomi
does not authorize you in any form to use the trademarks, copyrights, or other
forms of intellectual property rights of Xiaomi and its affiliates, including,
without limitation, without obtaining other written permission from Xiaomi, you
shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
may make the public associate with Xiaomi in any form to publicize or promote
the software or hardware devices that use the Licensed Work.
Xiaomi has the right to immediately terminate all your authorization under this
License in the event:
1. You assert patent invalidation, litigation, or other claims against patents
or other intellectual property rights of Xiaomi or its affiliates; or,
2. You make, have made, manufacture, sell, or offer to sell products that knock
off Xiaomi or its affiliates' products.
MIoT event loop.
"""
import selectors
import heapq
import time
import traceback
from typing import Any, Callable, TypeVar
import logging
import threading
# pylint: disable=relative-beyond-top-level
from .miot_error import MIoTEvError
_LOGGER = logging.getLogger(__name__)
TimeoutHandle = TypeVar('TimeoutHandle')
class MIoTFdHandler:
"""File descriptor handler."""
fd: int
read_handler: Callable[[Any], None]
read_handler_ctx: Any
write_handler: Callable[[Any], None]
write_handler_ctx: Any
def __init__(
self, fd: int,
read_handler: Callable[[Any], None] = None,
read_handler_ctx: Any = None,
write_handler: Callable[[Any], None] = None,
write_handler_ctx: Any = None
) -> None:
self.fd = fd
self.read_handler = read_handler
self.read_handler_ctx = read_handler_ctx
self.write_handler = write_handler
self.write_handler_ctx = write_handler_ctx
class MIoTTimeout:
"""Timeout handler."""
key: TimeoutHandle
target: int
handler: Callable[[Any], None]
handler_ctx: Any
def __init__(
self, key: str = None, target: int = None,
handler: Callable[[Any], None] = None,
handler_ctx: Any = None
) -> None:
self.key = key
self.target = target
self.handler = handler
self.handler_ctx = handler_ctx
def __lt__(self, other):
return self.target < other.target
class MIoTEventLoop:
"""MIoT event loop."""
_poll_fd: selectors.DefaultSelector
_fd_handlers: dict[str, MIoTFdHandler]
_timer_heap: list[MIoTTimeout]
_timer_handlers: dict[str, MIoTTimeout]
_timer_handle_seed: int
# Label if the current fd handler is freed inside a read handler to
# avoid invalid reading.
_fd_handler_freed_in_read_handler: bool
def __init__(self) -> None:
self._poll_fd = selectors.DefaultSelector()
self._timer_heap = []
self._timer_handlers = {}
self._timer_handle_seed = 1
self._fd_handlers = {}
self._fd_handler_freed_in_read_handler = False
def loop_forever(self) -> None:
"""Run an event loop in current thread."""
next_timeout: int
while True:
next_timeout = 0
# Handle timer
now_ms: int = self.__get_monotonic_ms
while len(self._timer_heap) > 0:
timer: MIoTTimeout = self._timer_heap[0]
if timer is None:
break
if timer.target <= now_ms:
heapq.heappop(self._timer_heap)
del self._timer_handlers[timer.key]
if timer.handler:
timer.handler(timer.handler_ctx)
else:
next_timeout = timer.target-now_ms
break
# Are there any files to listen to
if next_timeout == 0 and self._fd_handlers:
next_timeout = None # None == infinite
# Wait for timers & fds
if next_timeout == 0:
# Neither timer nor fds exist, exit loop
break
# Handle fd event
events = self._poll_fd.select(
timeout=next_timeout/1000.0 if next_timeout else next_timeout)
for key, mask in events:
fd_handler: MIoTFdHandler = key.data
if fd_handler is None:
continue
self._fd_handler_freed_in_read_handler = False
fd_key = str(id(fd_handler.fd))
if fd_key not in self._fd_handlers:
continue
if (
mask & selectors.EVENT_READ > 0
and fd_handler.read_handler
):
fd_handler.read_handler(fd_handler.read_handler_ctx)
if (
mask & selectors.EVENT_WRITE > 0
and self._fd_handler_freed_in_read_handler is False
and fd_handler.write_handler
):
fd_handler.write_handler(fd_handler.write_handler_ctx)
def loop_stop(self) -> None:
"""Stop the event loop."""
if self._poll_fd:
self._poll_fd.close()
self._poll_fd = None
self._fd_handlers = {}
self._timer_heap = []
self._timer_handlers = {}
def set_timeout(
self, timeout_ms: int, handler: Callable[[Any], None],
handler_ctx: Any = None
) -> TimeoutHandle:
"""Set a timer."""
if timeout_ms is None or handler is None:
raise MIoTEvError('invalid params')
new_timeout: MIoTTimeout = MIoTTimeout()
new_timeout.key = self.__get_next_timeout_handle
new_timeout.target = self.__get_monotonic_ms + timeout_ms
new_timeout.handler = handler
new_timeout.handler_ctx = handler_ctx
heapq.heappush(self._timer_heap, new_timeout)
self._timer_handlers[new_timeout.key] = new_timeout
return new_timeout.key
def clear_timeout(self, timer_key: TimeoutHandle) -> None:
"""Stop and remove the timer."""
if timer_key is None:
return
timer: MIoTTimeout = self._timer_handlers.pop(timer_key, None)
if timer:
self._timer_heap = list(self._timer_heap)
self._timer_heap.remove(timer)
heapq.heapify(self._timer_heap)
def set_read_handler(
self, fd: int, handler: Callable[[Any], None], handler_ctx: Any = None
) -> bool:
"""Set a read handler for a file descriptor.
Returns:
bool: True, success. False, failed.
"""
self.__set_handler(
fd, is_read=True, handler=handler, handler_ctx=handler_ctx)
def set_write_handler(
self, fd: int, handler: Callable[[Any], None], handler_ctx: Any = None
) -> bool:
"""Set a write handler for a file descriptor.
Returns:
bool: True, success. False, failed.
"""
self.__set_handler(
fd, is_read=False, handler=handler, handler_ctx=handler_ctx)
def __set_handler(
self, fd, is_read: bool, handler: Callable[[Any], None],
handler_ctx: Any = None
) -> bool:
"""Set a handler."""
if fd is None:
raise MIoTEvError('invalid params')
if not self._poll_fd:
raise MIoTEvError('event loop not started')
fd_key: str = str(id(fd))
fd_handler = self._fd_handlers.get(fd_key, None)
if fd_handler is None:
fd_handler = MIoTFdHandler(fd=fd)
fd_handler.fd = fd
self._fd_handlers[fd_key] = fd_handler
read_handler_existed = fd_handler.read_handler is not None
write_handler_existed = fd_handler.write_handler is not None
if is_read is True:
fd_handler.read_handler = handler
fd_handler.read_handler_ctx = handler_ctx
else:
fd_handler.write_handler = handler
fd_handler.write_handler_ctx = handler_ctx
if fd_handler.read_handler is None and fd_handler.write_handler is None:
# Remove from epoll and map
try:
self._poll_fd.unregister(fd)
except (KeyError, ValueError, OSError) as e:
del e
self._fd_handlers.pop(fd_key, None)
# May be inside a read handler, if not, this has no effect
self._fd_handler_freed_in_read_handler = True
elif read_handler_existed is False and write_handler_existed is False:
# Add to epoll
events = 0x0
if fd_handler.read_handler:
events |= selectors.EVENT_READ
if fd_handler.write_handler:
events |= selectors.EVENT_WRITE
try:
self._poll_fd.register(fd, events=events, data=fd_handler)
except (KeyError, ValueError, OSError) as e:
_LOGGER.error(
'%s, register fd, error, %s, %s, %s, %s, %s',
threading.current_thread().name,
'read' if is_read else 'write',
fd_key, handler, e, traceback.format_exc())
self._fd_handlers.pop(fd_key, None)
return False
elif (
read_handler_existed != (fd_handler.read_handler is not None)
or write_handler_existed != (fd_handler.write_handler is not None)
):
# Modify epoll
events = 0x0
if fd_handler.read_handler:
events |= selectors.EVENT_READ
if fd_handler.write_handler:
events |= selectors.EVENT_WRITE
try:
self._poll_fd.modify(fd, events=events, data=fd_handler)
except (KeyError, ValueError, OSError) as e:
_LOGGER.error(
'%s, modify fd, error, %s, %s, %s, %s, %s',
threading.current_thread().name,
'read' if is_read else 'write',
fd_key, handler, e, traceback.format_exc())
self._fd_handlers.pop(fd_key, None)
return False
return True
@property
def __get_next_timeout_handle(self) -> str:
# Get next timeout handle, that is not larger than the maximum
# value of UINT64 type.
self._timer_handle_seed += 1
# uint64 max
self._timer_handle_seed %= 0xFFFFFFFFFFFFFFFF
return str(self._timer_handle_seed)
@property
def __get_monotonic_ms(self) -> int:
"""Get monotonic ms timestamp."""
return int(time.monotonic()*1000)

View File

@ -48,7 +48,7 @@ MIoT internationalization translation.
import asyncio
import logging
import os
from typing import Optional
from typing import Optional, Union
# pylint: disable=relative-beyond-top-level
from .common import load_json_file
@ -98,7 +98,7 @@ class MIoTI18n:
def translate(
self, key: str, replace: Optional[dict[str, str]] = None
) -> str | dict | None:
) -> Union[str, dict, None]:
result = self._data
for item in key.split('.'):
if item not in result:

View File

@ -381,7 +381,8 @@ class _MIoTLanDevice:
_MIoTLanDeviceState(state.value+1))
# Fast ping
if self._if_name is None:
_LOGGER.error('if_name is Not set for device, %s', self.did)
_LOGGER.error(
'if_name is Not set for device, %s', self.did)
return
if self.ip is None:
_LOGGER.error('ip is Not set for device, %s', self.did)
@ -419,10 +420,10 @@ class _MIoTLanDevice:
self.online = True
else:
_LOGGER.info('unstable device detected, %s', self.did)
self._online_offline_timer = \
self._online_offline_timer = (
self._manager.internal_loop.call_later(
self.NETWORK_UNSTABLE_RESUME_TH,
self.__online_resume_handler)
self.__online_resume_handler))
def __online_resume_handler(self) -> None:
_LOGGER.info('unstable resume threshold past, %s', self.did)
@ -508,9 +509,9 @@ class MIoTLan:
key='miot_lan', group_id='*',
handler=self.__on_mips_service_change)
self._enable_subscribe = enable_subscribe
self._virtual_did = str(virtual_did) \
if (virtual_did is not None) \
else str(secrets.randbits(64))
self._virtual_did = (
str(virtual_did) if (virtual_did is not None)
else str(secrets.randbits(64)))
# Init socket probe message
probe_bytes = bytearray(self.OT_PROBE_LEN)
probe_bytes[:20] = (
@ -948,7 +949,7 @@ class MIoTLan:
# The following methods SHOULD ONLY be called in the internal loop
def ping(self, if_name: str | None, target_ip: str) -> None:
def ping(self, if_name: Optional[str], target_ip: str) -> None:
if not target_ip:
return
self.__sendto(
@ -964,7 +965,7 @@ class MIoTLan:
) -> None:
if timeout_ms and not handler:
raise ValueError('handler is required when timeout_ms is set')
device: _MIoTLanDevice | None = self._lan_devices.get(did)
device: Optional[_MIoTLanDevice] = self._lan_devices.get(did)
if not device:
raise ValueError('invalid device')
if not device.cipher:
@ -1232,7 +1233,7 @@ class MIoTLan:
return
# Keep alive message
did: str = str(struct.unpack('>Q', data[4:12])[0])
device: _MIoTLanDevice | None = self._lan_devices.get(did)
device: Optional[_MIoTLanDevice] = self._lan_devices.get(did)
if not device:
return
timestamp: int = struct.unpack('>I', data[12:16])[0]
@ -1272,8 +1273,8 @@ class MIoTLan:
_LOGGER.warning('invalid message, no id, %s, %s', did, msg)
return
# Reply
req: _MIoTLanRequestData | None = \
self._pending_requests.pop(msg['id'], None)
req: Optional[_MIoTLanRequestData] = (
self._pending_requests.pop(msg['id'], None))
if req:
if req.timeout:
req.timeout.cancel()
@ -1334,7 +1335,7 @@ class MIoTLan:
return False
def __sendto(
self, if_name: str | None, data: bytes, address: str, port: int
self, if_name: Optional[str], data: bytes, address: str, port: int
) -> None:
if if_name is None:
# Broadcast
@ -1356,7 +1357,7 @@ class MIoTLan:
try:
# Scan devices
self.ping(if_name=None, target_ip='255.255.255.255')
except Exception as err: # pylint: disable=broad-exception-caught
except Exception as err: # pylint: disable=broad-exception-caught
# Ignore any exceptions to avoid blocking the loop
_LOGGER.error('ping device error, %s', err)
pass

File diff suppressed because it is too large Load Diff

View File

@ -61,6 +61,7 @@ from .miot_storage import (
MIoTStorage,
SpecBoolTranslation,
SpecFilter,
SpecCustomService,
SpecMultiLang)
_LOGGER = logging.getLogger(__name__)
@ -468,6 +469,7 @@ class MIoTSpecParser:
_bool_trans: SpecBoolTranslation
_multi_lang: SpecMultiLang
_spec_filter: SpecFilter
_custom_service: SpecCustomService
def __init__(
self, lang: str = DEFAULT_INTEGRATION_LANGUAGE,
@ -486,6 +488,7 @@ class MIoTSpecParser:
lang=self._lang, loop=self._main_loop)
self._multi_lang = SpecMultiLang(lang=self._lang, loop=self._main_loop)
self._spec_filter = SpecFilter(loop=self._main_loop)
self._custom_service = SpecCustomService(loop=self._main_loop)
async def init_async(self) -> None:
if self._init_done is True:
@ -493,6 +496,7 @@ class MIoTSpecParser:
await self._bool_trans.init_async()
await self._multi_lang.init_async()
await self._spec_filter.init_async()
await self._custom_service.init_async()
std_lib_cache: dict = None
if self._storage:
std_lib_cache: dict = await self._storage.load_async(
@ -538,6 +542,7 @@ class MIoTSpecParser:
await self._bool_trans.deinit_async()
await self._multi_lang.deinit_async()
await self._spec_filter.deinit_async()
await self._custom_service.deinit_async()
self._ram_cache.clear()
async def parse(
@ -781,6 +786,12 @@ class MIoTSpecParser:
_LOGGER.debug('parse urn, %s', urn)
# Load spec instance
instance: dict = await self.__get_instance(urn=urn)
urn_strs: list[str] = urn.split(':')
urn_key: str = ':'.join(urn_strs[:6])
# Modify the spec instance by custom spec
instance = self._custom_service.modify_spec(urn_key=urn_key,
spec=instance)
# Check required fields in the device instance
if (
not isinstance(instance, dict)
or 'type' not in instance
@ -798,8 +809,6 @@ class MIoTSpecParser:
or not isinstance(res_trans['data'], dict)
):
raise MIoTSpecError('invalid translation data')
urn_strs: list[str] = urn.split(':')
urn_key: str = ':'.join(urn_strs[:6])
trans_data: dict[str, str] = None
if self._lang == 'zh-Hans':
# Simplified Chinese

View File

@ -1033,3 +1033,65 @@ class DeviceManufacturer:
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('get manufacturer info failed, %s', err)
return None
class SpecCustomService:
"""Custom MIoT-Spec-V2 service defined by the user."""
CUSTOM_SPEC_FILE = 'specs/custom_service.json'
_main_loop: asyncio.AbstractEventLoop
_data: dict[str, dict[str, any]]
def __init__(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
self._main_loop = loop or asyncio.get_event_loop()
self._data = None
async def init_async(self) -> None:
if isinstance(self._data, dict):
return
custom_data = None
self._data = {}
try:
custom_data = await self._main_loop.run_in_executor(
None, load_json_file,
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
self.CUSTOM_SPEC_FILE))
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('custom service, load file error, %s', err)
return
if not isinstance(custom_data, dict):
_LOGGER.error('custom service, invalid spec content')
return
for values in list(custom_data.values()):
if not isinstance(values, dict):
_LOGGER.error('custom service, invalid spec data')
return
self._data = custom_data
async def deinit_async(self) -> None:
self._data = None
def modify_spec(self, urn_key: str, spec: dict) -> dict | None:
"""MUST call init_async() first."""
if not self._data:
_LOGGER.error('self._data is None')
return spec
if urn_key not in self._data:
return spec
if 'services' not in spec:
return spec
if isinstance(self._data[urn_key], str):
urn_key = self._data[urn_key]
spec_services = spec['services']
custom_spec = self._data.get(urn_key, None)
# Replace services by custom defined spec
for i, service in enumerate(spec_services):
siid = str(service['iid'])
if siid in custom_spec:
spec_services[i] = custom_spec[siid]
# Add new services
if 'new' in custom_spec:
for service in custom_spec['new']:
spec_services.append(service)
return spec

View File

@ -0,0 +1,152 @@
{
"urn:miot-spec-v2:device:airer:0000A00D:hyd-lyjpro": {
"3": {
"iid": 3,
"type": "urn:miot-spec-v2:service:light:00007802:hyd-lyjpro:1",
"description": "Light",
"properties": [
{
"iid": 1,
"type": "urn:miot-spec-v2:property:on:00000006:hyd-lyjpro:1",
"description": "Sunlight",
"format": "bool",
"access": [
"read",
"write",
"notify"
]
},
{
"iid": 3,
"type": "urn:miot-spec-v2:property:flex-switch:000000EC:hyd-lyjpro:1",
"description": "Flex Switch",
"format": "uint8",
"access": [
"read",
"write",
"notify"
],
"value-list": [
{
"value": 1,
"description": "Overturn"
}
]
}
]
},
"new": [
{
"iid": 3,
"type": "urn:miot-spec-v2:service:light:00007802:hyd-lyjpro:1",
"description": "Moonlight",
"properties": [
{
"iid": 2,
"type": "urn:miot-spec-v2:property:on:00000006:hyd-lyjpro:1",
"description": "Switch Status",
"format": "bool",
"access": [
"read",
"write",
"notify"
]
}
]
}
]
},
"urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling19": "urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling4",
"urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling20": "urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling4",
"urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling4": {
"new": [
{
"iid": 200,
"type": "urn:miot-spec-v2:service:ambient-light:0000789D:yeelink-ceiling4:1",
"description": "Ambient Light",
"properties": [
{
"iid": 201,
"type": "urn:miot-spec-v2:property:on:00000006:yeelink-ceiling4:1",
"description": "Switch Status",
"format": "bool",
"access": [
"read",
"write"
]
},
{
"iid": 202,
"type": "urn:miot-spec-v2:property:brightness:0000000D:yeelink-ceiling4:1",
"description": "Brightness",
"format": "uint8",
"access": [
"read",
"write"
],
"unit": "percentage",
"value-range": [
1,
100,
1
]
},
{
"iid": 203,
"type": "urn:miot-spec-v2:property:color-temperature:0000000F:yeelink-ceiling4:1",
"description": "Color Temperature",
"format": "uint32",
"access": [
"read",
"write"
],
"unit": "kelvin",
"value-range": [
1700,
6500,
1
]
},
{
"iid": 204,
"type": "urn:miot-spec-v2:property:color:0000000E:yeelink-ceiling4:1",
"description": "Color",
"format": "uint32",
"access": [
"read",
"write"
],
"unit": "rgb",
"value-range": [
1,
16777215,
1
]
}
]
}
]
},
"urn:miot-spec-v2:device:water-heater:0000A02A:zimi-h03": {
"new": [
{
"iid": 2,
"type": "urn:miot-spec-v2:service:switch:0000780C:zimi-h03:1",
"description": "Heat Water",
"properties": [
{
"iid": 6,
"type": "urn:miot-spec-v2:property:on:00000006:zimi-h03:1",
"description": "Switch Status",
"format": "bool",
"access": [
"read",
"write",
"notify"
]
}
]
}
]
}
}

View File

@ -155,7 +155,7 @@
"service:004:property:001": "事件名稱"
}
},
"urn:miot-spec-v2:device:switch:0000A003:lumi-acn040:1": {
"urn:miot-spec-v2:device:switch:0000A003:lumi-acn040": {
"en": {
"service:011": "Right Button On and Off",
"service:011:property:001": "Right Button On and Off",

View File

@ -20,6 +20,14 @@ SPEC_MULTI_LANG_FILE = path.join(
SPEC_FILTER_FILE = path.join(
ROOT_PATH,
'../custom_components/xiaomi_home/miot/specs/spec_filter.json')
CUSTOM_SERVICE_FILE = path.join(
ROOT_PATH,
'../custom_components/xiaomi_home/miot/specs/custom_service.json')
BOOL_TRANS_URN_KEY_COLON_NUM: int = 4
CUSTOM_SERVICE_URN_KEY_COLON_NUM: int = 5
MULTI_LANG_URN_KEY_COLON_NUM: int = 5
SPEC_FILTER_URN_KEY_COLON_NUM: int = 5
def load_json_file(file_path: str) -> Optional[dict]:
@ -90,11 +98,18 @@ def nested_3_dict_str_str(d: dict) -> bool:
return False
return True
def urn_key(d: dict, cnt: int) -> bool:
for k in d.keys():
if cnt != k.count(':'):
return False
return True
def spec_filter(d: dict) -> bool:
"""restricted format: dict[str, dict[str, list<str>]]"""
if not dict_str_dict(d):
return False
if not urn_key(d, SPEC_FILTER_URN_KEY_COLON_NUM):
return False
for value in d.values():
for k, v in value.items():
if not isinstance(k, str) or not isinstance(v, list):
@ -104,12 +119,130 @@ def spec_filter(d: dict) -> bool:
return True
def spec_instance_format(d: dict) -> bool:
"""restricted format of MIoT-Spec-V2 instance"""
if ('iid' not in d) or ('type' not in d) or ('description' not in d):
return False
if not isinstance(d['iid'], int) or not isinstance(d['type'], str) or (
not isinstance(d['description'], str)):
return False
# optional keys for property
if 'format' in d:
if not isinstance(d['format'], str):
return False
if 'unit' in d:
if not isinstance(d['unit'], str):
return False
if 'access' in d:
if not isinstance(d['access'], list):
return False
for i in d['access']:
if not isinstance(i, str):
return False
if 'value-list' in d:
if not isinstance(d['value-list'], list):
return False
for i in d['value-list']:
if not isinstance(i, dict):
return False
if 'value' not in i or 'description' not in i:
return False
if not isinstance(i['value'], int) or not isinstance(i[
'description'], str):
return False
if i['description'].replace(' ','') == '':
return False
# optional keys for action
if 'in' in d:
if not isinstance(d['in'], list):
return False
for i in d['in']:
if not isinstance(i, int):
return False
if 'out' in d:
if not isinstance(d['out'], list):
return False
for i in d['out']:
if not isinstance(i, int):
return False
# optional keys for event
if 'arguments' in d:
if not isinstance(d['arguments'], list):
return False
for i in d['arguments']:
if not isinstance(i, int):
return False
# optional keys for service
if 'properties' in d:
if not isinstance(d['properties'], list):
return False
for i in d['properties']:
if not spec_instance_format(i):
return False
if 'actions' in d:
if not isinstance(d['actions'], list):
return False
for i in d['actions']:
if not spec_instance_format(i):
return False
if 'events' in d:
if not isinstance(d['events'], list):
return False
for i in d['events']:
if not spec_instance_format(i):
return False
return True
def is_integer(s: str) -> bool:
try:
int(s)
return True
except ValueError:
return False
def custom_service(d: dict) -> bool:
"""restricted format: dict[str, dict[str, Any]] or dict[str, str]"""
if not isinstance(d, dict):
return False
for k, v in d.items():
if not isinstance(k, str):
return False
if not (isinstance(v, dict) or isinstance(v, str)):
return False
if not urn_key(d, CUSTOM_SERVICE_URN_KEY_COLON_NUM):
return False
for v in d.values():
if isinstance(v, str):
if CUSTOM_SERVICE_URN_KEY_COLON_NUM != v.count(':'):
return False
continue
for key, value in v.items():
if key=='new':
if not isinstance(value, list):
return False
for i in value:
if not spec_instance_format(i):
return False
elif is_integer(key):
if not isinstance(value, dict):
return False
if not spec_instance_format(value):
return False
else:
return False
return True
def bool_trans(d: dict) -> bool:
"""dict[str, dict[str, str] | dict[str, dict[str, str]] ]"""
if not isinstance(d, dict):
return False
if 'data' not in d or 'translate' not in d:
return False
if not urn_key(d['data'], BOOL_TRANS_URN_KEY_COLON_NUM):
return False
if not dict_str_str(d['data']):
return False
if not nested_3_dict_str_str(d['translate']):
@ -126,6 +259,13 @@ def bool_trans(d: dict) -> bool:
return False
return True
def multi_lang(d: dict) -> bool:
"""dict[str, dict[str, dict[str, dict[str, str]] ] ]"""
if not nested_3_dict_str_str(d):
return False
if not urn_key(d, MULTI_LANG_URN_KEY_COLON_NUM):
return False
return True
def compare_dict_structure(dict1: dict, dict2: dict) -> bool:
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
@ -160,13 +300,13 @@ def sort_bool_trans(file_path: str):
def sort_multi_lang(file_path: str):
multi_lang: dict = load_json_file(file_path=file_path)
multi_lang = dict(sorted(multi_lang.items()))
for urn, trans in multi_lang.items():
multi_lang[urn] = dict(sorted(trans.items()))
for lang, spec in multi_lang[urn].items():
multi_lang[urn][lang] = dict(sorted(spec.items()))
return multi_lang
lang_data: dict = load_json_file(file_path=file_path)
lang_data = dict(sorted(lang_data.items()))
for urn, trans in lang_data.items():
lang_data[urn] = dict(sorted(trans.items()))
for lang, spec in lang_data[urn].items():
lang_data[urn][lang] = dict(sorted(spec.items()))
return lang_data
def sort_spec_filter(file_path: str):
@ -177,6 +317,17 @@ def sort_spec_filter(file_path: str):
return filter_data
def sort_custom_service(file_path: str):
service_data: dict = load_json_file(file_path=file_path)
service_data = dict(sorted(service_data.items()))
for urn, spec in service_data.items():
if isinstance(spec, dict):
service_data[urn] = dict(sorted(spec.items()))
else:
service_data[urn] = spec
return service_data
@pytest.mark.github
def test_bool_trans():
data: dict = load_json_file(SPEC_BOOL_TRANS_FILE)
@ -195,7 +346,14 @@ def test_spec_filter():
def test_multi_lang():
data: dict = load_json_file(SPEC_MULTI_LANG_FILE)
assert data, f'load {SPEC_MULTI_LANG_FILE} failed'
assert nested_3_dict_str_str(data), f'{SPEC_MULTI_LANG_FILE} format error'
assert multi_lang(data), f'{SPEC_MULTI_LANG_FILE} format error'
@pytest.mark.github
def test_custom_service():
data: dict = load_json_file(CUSTOM_SERVICE_FILE)
assert data, f'load {CUSTOM_SERVICE_FILE} failed'
assert custom_service(data), f'{CUSTOM_SERVICE_FILE} format error'
@pytest.mark.github
@ -278,6 +436,12 @@ def test_miot_data_sort():
f'{SPEC_FILTER_FILE} not sorted, goto project root path'
' and run the following command sorting, ',
'pytest -s -v -m update ./test/check_rule_format.py')
assert json.dumps(
load_json_file(file_path=CUSTOM_SERVICE_FILE)) == json.dumps(
sort_custom_service(file_path=CUSTOM_SERVICE_FILE)), (
f'{CUSTOM_SERVICE_FILE} not sorted, goto project root path'
' and run the following command sorting, ',
'pytest -s -v -m update ./test/check_rule_format.py')
@pytest.mark.update
@ -291,3 +455,6 @@ def test_sort_spec_data():
sort_data = sort_spec_filter(file_path=SPEC_FILTER_FILE)
save_json_file(file_path=SPEC_FILTER_FILE, data=sort_data)
print(SPEC_FILTER_FILE, 'formatted.')
sort_data = sort_custom_service(file_path=CUSTOM_SERVICE_FILE)
save_json_file(file_path=CUSTOM_SERVICE_FILE, data=sort_data)
print(CUSTOM_SERVICE_FILE, 'formatted.')

View File

@ -20,7 +20,6 @@ def load_py_file():
'const.py',
'miot_cloud.py',
'miot_error.py',
'miot_ev.py',
'miot_i18n.py',
'miot_lan.py',
'miot_mdns.py',

View File

@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
"""Unit test for miot_ev.py."""
import os
import pytest
# pylint: disable=import-outside-toplevel, disable=unused-argument
@pytest.mark.github
def test_mev_timer_and_fd():
from miot.miot_ev import MIoTEventLoop, TimeoutHandle
mev = MIoTEventLoop()
assert mev
event_fd: os.eventfd = os.eventfd(0, os.O_NONBLOCK)
assert event_fd
timer4: TimeoutHandle = None
def event_handler(event_fd):
value: int = os.eventfd_read(event_fd)
if value == 1:
mev.clear_timeout(timer4)
print('cancel timer4')
elif value == 2:
print('event write twice in a row')
elif value == 3:
mev.set_read_handler(event_fd, None, None)
os.close(event_fd)
event_fd = None
print('close event fd')
def timer1_handler(event_fd):
os.eventfd_write(event_fd, 1)
def timer2_handler(event_fd):
os.eventfd_write(event_fd, 1)
os.eventfd_write(event_fd, 1)
def timer3_handler(event_fd):
os.eventfd_write(event_fd, 3)
def timer4_handler(event_fd):
raise ValueError('unreachable code')
mev.set_read_handler(
event_fd, event_handler, event_fd)
mev.set_timeout(500, timer1_handler, event_fd)
mev.set_timeout(1000, timer2_handler, event_fd)
mev.set_timeout(1500, timer3_handler, event_fd)
timer4 = mev.set_timeout(2000, timer4_handler, event_fd)
mev.loop_forever()
# Loop will exit when there are no timers or fd handlers.
mev.loop_stop()