This commit is contained in:
Li Shuzhen 2025-01-03 20:46:59 +08:00 committed by GitHub
commit 10d5c3cfb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 408 additions and 15 deletions

View File

@ -155,7 +155,8 @@ async def async_setup_entry(
for entity in filter_entities:
device.entity_list[platform].remove(entity)
entity_id = device.gen_service_entity_id(
ha_domain=platform, siid=entity.spec.iid)
ha_domain=platform, siid=entity.spec.iid,
description=entity.spec.description)
if er.async_get(entity_id_or_uuid=entity_id):
er.async_remove(entity_id=entity_id)
if platform in device.prop_list:

View File

@ -298,10 +298,11 @@ class MIoTDevice:
f'{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_'
f'{self._model_strs[-1][:20]}')
def gen_service_entity_id(self, ha_domain: str, siid: int) -> str:
def gen_service_entity_id(self, ha_domain: str, siid: int,
description: str) -> str:
return (
f'{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_'
f'{self._model_strs[-1][:20]}_s_{siid}')
f'{self._model_strs[-1][:20]}_s_{siid}_{description}')
def gen_prop_entity_id(
self, ha_domain: str, spec_name: str, siid: int, piid: int
@ -744,7 +745,8 @@ class MIoTServiceEntity(Entity):
self._attr_name = f' {self.entity_data.spec.description_trans}'
elif isinstance(entity_data.spec, MIoTSpecService):
self.entity_id = miot_device.gen_service_entity_id(
DOMAIN, siid=entity_data.spec.iid)
DOMAIN, siid=entity_data.spec.iid,
description=entity_data.spec.description)
self._attr_name = (
f'{"* "if self.entity_data.spec.proprietary else " "}'
f'{self.entity_data.spec.description_trans}')

View File

@ -61,6 +61,7 @@ from .miot_storage import (
MIoTStorage,
SpecBoolTranslation,
SpecFilter,
SpecCustomService,
SpecMultiLang)
_LOGGER = logging.getLogger(__name__)
@ -468,6 +469,7 @@ class MIoTSpecParser:
_bool_trans: SpecBoolTranslation
_multi_lang: SpecMultiLang
_spec_filter: SpecFilter
_custom_service: SpecCustomService
def __init__(
self, lang: str = DEFAULT_INTEGRATION_LANGUAGE,
@ -486,6 +488,7 @@ class MIoTSpecParser:
lang=self._lang, loop=self._main_loop)
self._multi_lang = SpecMultiLang(lang=self._lang, loop=self._main_loop)
self._spec_filter = SpecFilter(loop=self._main_loop)
self._custom_service = SpecCustomService(loop=self._main_loop)
async def init_async(self) -> None:
if self._init_done is True:
@ -493,6 +496,7 @@ class MIoTSpecParser:
await self._bool_trans.init_async()
await self._multi_lang.init_async()
await self._spec_filter.init_async()
await self._custom_service.init_async()
std_lib_cache: dict = None
if self._storage:
std_lib_cache: dict = await self._storage.load_async(
@ -538,6 +542,7 @@ class MIoTSpecParser:
await self._bool_trans.deinit_async()
await self._multi_lang.deinit_async()
await self._spec_filter.deinit_async()
await self._custom_service.deinit_async()
self._ram_cache.clear()
async def parse(
@ -781,6 +786,12 @@ class MIoTSpecParser:
_LOGGER.debug('parse urn, %s', urn)
# Load spec instance
instance: dict = await self.__get_instance(urn=urn)
urn_strs: list[str] = urn.split(':')
urn_key: str = ':'.join(urn_strs[:6])
# Modify the spec instance by custom spec
instance = self._custom_service.modify_spec(urn_key=urn_key,
spec=instance)
# Check required fields in the device instance
if (
not isinstance(instance, dict)
or 'type' not in instance
@ -798,8 +809,6 @@ class MIoTSpecParser:
or not isinstance(res_trans['data'], dict)
):
raise MIoTSpecError('invalid translation data')
urn_strs: list[str] = urn.split(':')
urn_key: str = ':'.join(urn_strs[:6])
trans_data: dict[str, str] = None
if self._lang == 'zh-Hans':
# Simplified Chinese

View File

@ -1033,3 +1033,65 @@ class DeviceManufacturer:
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('get manufacturer info failed, %s', err)
return None
class SpecCustomService:
"""Custom MIoT-Spec-V2 service defined by the user."""
CUSTOM_SPEC_FILE = 'specs/custom_service.json'
_main_loop: asyncio.AbstractEventLoop
_data: dict[str, dict[str, any]]
def __init__(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
self._main_loop = loop or asyncio.get_event_loop()
self._data = None
async def init_async(self) -> None:
if isinstance(self._data, dict):
return
custom_data = None
self._data = {}
try:
custom_data = await self._main_loop.run_in_executor(
None, load_json_file,
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
self.CUSTOM_SPEC_FILE))
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error('custom service, load file error, %s', err)
return
if not isinstance(custom_data, dict):
_LOGGER.error('custom service, invalid spec content')
return
for values in list(custom_data.values()):
if not isinstance(values, dict):
_LOGGER.error('custom service, invalid spec data')
return
self._data = custom_data
async def deinit_async(self) -> None:
self._data = None
def modify_spec(self, urn_key: str, spec: dict) -> dict | None:
"""MUST call init_async() first."""
if not self._data:
_LOGGER.error('self._data is None')
return spec
if urn_key not in self._data:
return spec
if 'services' not in spec:
return spec
if isinstance(self._data[urn_key], str):
urn_key = self._data[urn_key]
spec_services = spec['services']
custom_spec = self._data.get(urn_key, None)
# Replace services by custom defined spec
for i, service in enumerate(spec_services):
siid = str(service['iid'])
if siid in custom_spec:
spec_services[i] = custom_spec[siid]
# Add new services
if 'new' in custom_spec:
for service in custom_spec['new']:
spec_services.append(service)
return spec

View File

@ -0,0 +1,152 @@
{
"urn:miot-spec-v2:device:airer:0000A00D:hyd-lyjpro": {
"3": {
"iid": 3,
"type": "urn:miot-spec-v2:service:light:00007802:hyd-lyjpro:1",
"description": "Light",
"properties": [
{
"iid": 1,
"type": "urn:miot-spec-v2:property:on:00000006:hyd-lyjpro:1",
"description": "Sunlight",
"format": "bool",
"access": [
"read",
"write",
"notify"
]
},
{
"iid": 3,
"type": "urn:miot-spec-v2:property:flex-switch:000000EC:hyd-lyjpro:1",
"description": "Flex Switch",
"format": "uint8",
"access": [
"read",
"write",
"notify"
],
"value-list": [
{
"value": 1,
"description": "Overturn"
}
]
}
]
},
"new": [
{
"iid": 3,
"type": "urn:miot-spec-v2:service:light:00007802:hyd-lyjpro:1",
"description": "Moonlight",
"properties": [
{
"iid": 2,
"type": "urn:miot-spec-v2:property:on:00000006:hyd-lyjpro:1",
"description": "Switch Status",
"format": "bool",
"access": [
"read",
"write",
"notify"
]
}
]
}
]
},
"urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling19": "urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling4",
"urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling20": "urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling4",
"urn:miot-spec-v2:device:light:0000A001:yeelink-ceiling4": {
"new": [
{
"iid": 200,
"type": "urn:miot-spec-v2:service:ambient-light:0000789D:yeelink-ceiling4:1",
"description": "Ambient Light",
"properties": [
{
"iid": 201,
"type": "urn:miot-spec-v2:property:on:00000006:yeelink-ceiling4:1",
"description": "Switch Status",
"format": "bool",
"access": [
"read",
"write"
]
},
{
"iid": 202,
"type": "urn:miot-spec-v2:property:brightness:0000000D:yeelink-ceiling4:1",
"description": "Brightness",
"format": "uint8",
"access": [
"read",
"write"
],
"unit": "percentage",
"value-range": [
1,
100,
1
]
},
{
"iid": 203,
"type": "urn:miot-spec-v2:property:color-temperature:0000000F:yeelink-ceiling4:1",
"description": "Color Temperature",
"format": "uint32",
"access": [
"read",
"write"
],
"unit": "kelvin",
"value-range": [
1700,
6500,
1
]
},
{
"iid": 204,
"type": "urn:miot-spec-v2:property:color:0000000E:yeelink-ceiling4:1",
"description": "Color",
"format": "uint32",
"access": [
"read",
"write"
],
"unit": "rgb",
"value-range": [
1,
16777215,
1
]
}
]
}
]
},
"urn:miot-spec-v2:device:water-heater:0000A02A:zimi-h03": {
"new": [
{
"iid": 2,
"type": "urn:miot-spec-v2:service:switch:0000780C:zimi-h03:1",
"description": "Heat Water",
"properties": [
{
"iid": 6,
"type": "urn:miot-spec-v2:property:on:00000006:zimi-h03:1",
"description": "Switch Status",
"format": "bool",
"access": [
"read",
"write",
"notify"
]
}
]
}
]
}
}

View File

@ -155,7 +155,7 @@
"service:004:property:001": "事件名稱"
}
},
"urn:miot-spec-v2:device:switch:0000A003:lumi-acn040:1": {
"urn:miot-spec-v2:device:switch:0000A003:lumi-acn040": {
"en": {
"service:011": "Right Button On and Off",
"service:011:property:001": "Right Button On and Off",

View File

@ -20,6 +20,14 @@ SPEC_MULTI_LANG_FILE = path.join(
SPEC_FILTER_FILE = path.join(
ROOT_PATH,
'../custom_components/xiaomi_home/miot/specs/spec_filter.json')
CUSTOM_SERVICE_FILE = path.join(
ROOT_PATH,
'../custom_components/xiaomi_home/miot/specs/custom_service.json')
BOOL_TRANS_URN_KEY_COLON_NUM: int = 4
CUSTOM_SERVICE_URN_KEY_COLON_NUM: int = 5
MULTI_LANG_URN_KEY_COLON_NUM: int = 5
SPEC_FILTER_URN_KEY_COLON_NUM: int = 5
def load_json_file(file_path: str) -> Optional[dict]:
@ -90,11 +98,18 @@ def nested_3_dict_str_str(d: dict) -> bool:
return False
return True
def urn_key(d: dict, cnt: int) -> bool:
for k in d.keys():
if cnt != k.count(':'):
return False
return True
def spec_filter(d: dict) -> bool:
"""restricted format: dict[str, dict[str, list<str>]]"""
if not dict_str_dict(d):
return False
if not urn_key(d, SPEC_FILTER_URN_KEY_COLON_NUM):
return False
for value in d.values():
for k, v in value.items():
if not isinstance(k, str) or not isinstance(v, list):
@ -104,12 +119,130 @@ def spec_filter(d: dict) -> bool:
return True
def spec_instance_format(d: dict) -> bool:
"""restricted format of MIoT-Spec-V2 instance"""
if ('iid' not in d) or ('type' not in d) or ('description' not in d):
return False
if not isinstance(d['iid'], int) or not isinstance(d['type'], str) or (
not isinstance(d['description'], str)):
return False
# optional keys for property
if 'format' in d:
if not isinstance(d['format'], str):
return False
if 'unit' in d:
if not isinstance(d['unit'], str):
return False
if 'access' in d:
if not isinstance(d['access'], list):
return False
for i in d['access']:
if not isinstance(i, str):
return False
if 'value-list' in d:
if not isinstance(d['value-list'], list):
return False
for i in d['value-list']:
if not isinstance(i, dict):
return False
if 'value' not in i or 'description' not in i:
return False
if not isinstance(i['value'], int) or not isinstance(i[
'description'], str):
return False
if i['description'].replace(' ','') == '':
return False
# optional keys for action
if 'in' in d:
if not isinstance(d['in'], list):
return False
for i in d['in']:
if not isinstance(i, int):
return False
if 'out' in d:
if not isinstance(d['out'], list):
return False
for i in d['out']:
if not isinstance(i, int):
return False
# optional keys for event
if 'arguments' in d:
if not isinstance(d['arguments'], list):
return False
for i in d['arguments']:
if not isinstance(i, int):
return False
# optional keys for service
if 'properties' in d:
if not isinstance(d['properties'], list):
return False
for i in d['properties']:
if not spec_instance_format(i):
return False
if 'actions' in d:
if not isinstance(d['actions'], list):
return False
for i in d['actions']:
if not spec_instance_format(i):
return False
if 'events' in d:
if not isinstance(d['events'], list):
return False
for i in d['events']:
if not spec_instance_format(i):
return False
return True
def is_integer(s: str) -> bool:
try:
int(s)
return True
except ValueError:
return False
def custom_service(d: dict) -> bool:
"""restricted format: dict[str, dict[str, Any]] or dict[str, str]"""
if not isinstance(d, dict):
return False
for k, v in d.items():
if not isinstance(k, str):
return False
if not (isinstance(v, dict) or isinstance(v, str)):
return False
if not urn_key(d, CUSTOM_SERVICE_URN_KEY_COLON_NUM):
return False
for v in d.values():
if isinstance(v, str):
if CUSTOM_SERVICE_URN_KEY_COLON_NUM != v.count(':'):
return False
continue
for key, value in v.items():
if key=='new':
if not isinstance(value, list):
return False
for i in value:
if not spec_instance_format(i):
return False
elif is_integer(key):
if not isinstance(value, dict):
return False
if not spec_instance_format(value):
return False
else:
return False
return True
def bool_trans(d: dict) -> bool:
"""dict[str, dict[str, str] | dict[str, dict[str, str]] ]"""
if not isinstance(d, dict):
return False
if 'data' not in d or 'translate' not in d:
return False
if not urn_key(d['data'], BOOL_TRANS_URN_KEY_COLON_NUM):
return False
if not dict_str_str(d['data']):
return False
if not nested_3_dict_str_str(d['translate']):
@ -126,6 +259,13 @@ def bool_trans(d: dict) -> bool:
return False
return True
def multi_lang(d: dict) -> bool:
"""dict[str, dict[str, dict[str, dict[str, str]] ] ]"""
if not nested_3_dict_str_str(d):
return False
if not urn_key(d, MULTI_LANG_URN_KEY_COLON_NUM):
return False
return True
def compare_dict_structure(dict1: dict, dict2: dict) -> bool:
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
@ -160,13 +300,13 @@ def sort_bool_trans(file_path: str):
def sort_multi_lang(file_path: str):
multi_lang: dict = load_json_file(file_path=file_path)
multi_lang = dict(sorted(multi_lang.items()))
for urn, trans in multi_lang.items():
multi_lang[urn] = dict(sorted(trans.items()))
for lang, spec in multi_lang[urn].items():
multi_lang[urn][lang] = dict(sorted(spec.items()))
return multi_lang
lang_data: dict = load_json_file(file_path=file_path)
lang_data = dict(sorted(lang_data.items()))
for urn, trans in lang_data.items():
lang_data[urn] = dict(sorted(trans.items()))
for lang, spec in lang_data[urn].items():
lang_data[urn][lang] = dict(sorted(spec.items()))
return lang_data
def sort_spec_filter(file_path: str):
@ -177,6 +317,17 @@ def sort_spec_filter(file_path: str):
return filter_data
def sort_custom_service(file_path: str):
service_data: dict = load_json_file(file_path=file_path)
service_data = dict(sorted(service_data.items()))
for urn, spec in service_data.items():
if isinstance(spec, dict):
service_data[urn] = dict(sorted(spec.items()))
else:
service_data[urn] = spec
return service_data
@pytest.mark.github
def test_bool_trans():
data: dict = load_json_file(SPEC_BOOL_TRANS_FILE)
@ -195,7 +346,14 @@ def test_spec_filter():
def test_multi_lang():
data: dict = load_json_file(SPEC_MULTI_LANG_FILE)
assert data, f'load {SPEC_MULTI_LANG_FILE} failed'
assert nested_3_dict_str_str(data), f'{SPEC_MULTI_LANG_FILE} format error'
assert multi_lang(data), f'{SPEC_MULTI_LANG_FILE} format error'
@pytest.mark.github
def test_custom_service():
data: dict = load_json_file(CUSTOM_SERVICE_FILE)
assert data, f'load {CUSTOM_SERVICE_FILE} failed'
assert custom_service(data), f'{CUSTOM_SERVICE_FILE} format error'
@pytest.mark.github
@ -278,6 +436,12 @@ def test_miot_data_sort():
f'{SPEC_FILTER_FILE} not sorted, goto project root path'
' and run the following command sorting, ',
'pytest -s -v -m update ./test/check_rule_format.py')
assert json.dumps(
load_json_file(file_path=CUSTOM_SERVICE_FILE)) == json.dumps(
sort_custom_service(file_path=CUSTOM_SERVICE_FILE)), (
f'{CUSTOM_SERVICE_FILE} not sorted, goto project root path'
' and run the following command sorting, ',
'pytest -s -v -m update ./test/check_rule_format.py')
@pytest.mark.update
@ -291,3 +455,6 @@ def test_sort_spec_data():
sort_data = sort_spec_filter(file_path=SPEC_FILTER_FILE)
save_json_file(file_path=SPEC_FILTER_FILE, data=sort_data)
print(SPEC_FILTER_FILE, 'formatted.')
sort_data = sort_custom_service(file_path=CUSTOM_SERVICE_FILE)
save_json_file(file_path=CUSTOM_SERVICE_FILE, data=sort_data)
print(CUSTOM_SERVICE_FILE, 'formatted.')