mirror of
https://github.com/XiaoMi/ha_xiaomi_home.git
synced 2026-01-14 13:20:42 +08:00
Merge ee05222abc into 365f4e57d8
This commit is contained in:
commit
529e22e735
@ -155,7 +155,8 @@ async def async_setup_entry(
|
||||
for entity in filter_entities:
|
||||
device.entity_list[platform].remove(entity)
|
||||
entity_id = device.gen_service_entity_id(
|
||||
ha_domain=platform, siid=entity.spec.iid)
|
||||
ha_domain=platform, siid=entity.spec.iid,
|
||||
description=entity.spec.description)
|
||||
if er.async_get(entity_id_or_uuid=entity_id):
|
||||
er.async_remove(entity_id=entity_id)
|
||||
if platform in device.prop_list:
|
||||
|
||||
@ -298,10 +298,11 @@ class MIoTDevice:
|
||||
f'{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_'
|
||||
f'{self._model_strs[-1][:20]}')
|
||||
|
||||
def gen_service_entity_id(self, ha_domain: str, siid: int) -> str:
|
||||
def gen_service_entity_id(self, ha_domain: str, siid: int,
|
||||
description: str) -> str:
|
||||
return (
|
||||
f'{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_'
|
||||
f'{self._model_strs[-1][:20]}_s_{siid}')
|
||||
f'{self._model_strs[-1][:20]}_s_{siid}_{description}')
|
||||
|
||||
def gen_prop_entity_id(
|
||||
self, ha_domain: str, spec_name: str, siid: int, piid: int
|
||||
@ -731,7 +732,8 @@ class MIoTServiceEntity(Entity):
|
||||
self._attr_name = f' {self.entity_data.spec.description_trans}'
|
||||
elif isinstance(entity_data.spec, MIoTSpecService):
|
||||
self.entity_id = miot_device.gen_service_entity_id(
|
||||
DOMAIN, siid=entity_data.spec.iid)
|
||||
DOMAIN, siid=entity_data.spec.iid,
|
||||
description=entity_data.spec.description)
|
||||
self._attr_name = (
|
||||
f'{"* "if self.entity_data.spec.proprietary else " "}'
|
||||
f'{self.entity_data.spec.description_trans}')
|
||||
|
||||
@ -61,6 +61,7 @@ from .miot_storage import (
|
||||
MIoTStorage,
|
||||
SpecBoolTranslation,
|
||||
SpecFilter,
|
||||
SpecCustomService,
|
||||
SpecMultiLang)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -466,6 +467,7 @@ class MIoTSpecParser:
|
||||
_bool_trans: SpecBoolTranslation
|
||||
_multi_lang: SpecMultiLang
|
||||
_spec_filter: SpecFilter
|
||||
_custom_service: SpecCustomService
|
||||
|
||||
def __init__(
|
||||
self, lang: str = DEFAULT_INTEGRATION_LANGUAGE,
|
||||
@ -484,6 +486,7 @@ class MIoTSpecParser:
|
||||
lang=self._lang, loop=self._main_loop)
|
||||
self._multi_lang = SpecMultiLang(lang=self._lang, loop=self._main_loop)
|
||||
self._spec_filter = SpecFilter(loop=self._main_loop)
|
||||
self._custom_service = SpecCustomService(loop=self._main_loop)
|
||||
|
||||
async def init_async(self) -> None:
|
||||
if self._init_done is True:
|
||||
@ -491,6 +494,7 @@ class MIoTSpecParser:
|
||||
await self._bool_trans.init_async()
|
||||
await self._multi_lang.init_async()
|
||||
await self._spec_filter.init_async()
|
||||
await self._custom_service.init_async()
|
||||
std_lib_cache: dict = None
|
||||
if self._storage:
|
||||
std_lib_cache: dict = await self._storage.load_async(
|
||||
@ -536,6 +540,7 @@ class MIoTSpecParser:
|
||||
await self._bool_trans.deinit_async()
|
||||
await self._multi_lang.deinit_async()
|
||||
await self._spec_filter.deinit_async()
|
||||
await self._custom_service.deinit_async()
|
||||
self._ram_cache.clear()
|
||||
|
||||
async def parse(
|
||||
@ -779,6 +784,12 @@ class MIoTSpecParser:
|
||||
_LOGGER.debug('parse urn, %s', urn)
|
||||
# Load spec instance
|
||||
instance: dict = await self.__get_instance(urn=urn)
|
||||
urn_strs: list[str] = urn.split(':')
|
||||
urn_key: str = ':'.join(urn_strs[:6])
|
||||
# Modify the spec instance by custom spec
|
||||
instance = self._custom_service.modify_spec(urn_key=urn_key,
|
||||
spec=instance)
|
||||
# Check required fields in the device instance
|
||||
if (
|
||||
not isinstance(instance, dict)
|
||||
or 'type' not in instance
|
||||
@ -796,8 +807,6 @@ class MIoTSpecParser:
|
||||
or not isinstance(res_trans['data'], dict)
|
||||
):
|
||||
raise MIoTSpecError('invalid translation data')
|
||||
urn_strs: list[str] = urn.split(':')
|
||||
urn_key: str = ':'.join(urn_strs[:6])
|
||||
trans_data: dict[str, str] = None
|
||||
if self._lang == 'zh-Hans':
|
||||
# Simplified Chinese
|
||||
|
||||
@ -1033,3 +1033,63 @@ class DeviceManufacturer:
|
||||
except Exception as err: # pylint: disable=broad-exception-caught
|
||||
_LOGGER.error('get manufacturer info failed, %s', err)
|
||||
return None
|
||||
|
||||
|
||||
class SpecCustomService:
|
||||
"""Custom MIoT-Spec-V2 service defined by the user."""
|
||||
CUSTOM_SPEC_FILE = 'specs/custom_service.json'
|
||||
_main_loop: asyncio.AbstractEventLoop
|
||||
_data: dict[str, dict[str, any]]
|
||||
|
||||
def __init__(self, loop: Optional[asyncio.AbstractEventLoop]) -> None:
|
||||
self._main_loop = loop or asyncio.get_event_loop()
|
||||
self._data = None
|
||||
|
||||
async def init_async(self) -> None:
|
||||
if isinstance(self._data, dict):
|
||||
return
|
||||
custom_data = None
|
||||
self._data = {}
|
||||
try:
|
||||
custom_data = await self._main_loop.run_in_executor(
|
||||
None, load_json_file,
|
||||
os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)),
|
||||
self.CUSTOM_SPEC_FILE))
|
||||
except Exception as err: # pylint: disable=broad-exception-caught
|
||||
_LOGGER.error('custom service, load file error, %s', err)
|
||||
return
|
||||
if not isinstance(custom_data, dict):
|
||||
_LOGGER.error('custom service, invalid spec content')
|
||||
return
|
||||
for values in list(custom_data.values()):
|
||||
if not isinstance(values, dict):
|
||||
_LOGGER.error('custom service, invalid spec data')
|
||||
return
|
||||
self._data = custom_data
|
||||
|
||||
async def deinit_async(self) -> None:
|
||||
self._data = None
|
||||
|
||||
def modify_spec(self, urn_key: str, spec: dict) -> dict | None:
|
||||
"""MUST call init_async() first."""
|
||||
if not self._data:
|
||||
_LOGGER.error('self._data is None')
|
||||
return spec
|
||||
if urn_key not in self._data:
|
||||
return spec
|
||||
if 'services' not in spec:
|
||||
return spec
|
||||
spec_services = spec['services']
|
||||
custom_spec = self._data.get(urn_key, None)
|
||||
# Replace services by custom defined spec
|
||||
for i, service in enumerate(spec_services):
|
||||
siid = str(service['iid'])
|
||||
if siid in custom_spec:
|
||||
spec_services[i] = custom_spec[siid]
|
||||
# Add new services
|
||||
if 'new' in custom_spec:
|
||||
for service in custom_spec['new']:
|
||||
spec_services.append(service)
|
||||
|
||||
return spec
|
||||
|
||||
59
custom_components/xiaomi_home/miot/specs/custom_service.json
Normal file
59
custom_components/xiaomi_home/miot/specs/custom_service.json
Normal file
@ -0,0 +1,59 @@
|
||||
{
|
||||
"urn:miot-spec-v2:device:airer:0000A00D:hyd-lyjpro": {
|
||||
"3": {
|
||||
"iid": 3,
|
||||
"type": "urn:miot-spec-v2:service:light:00007802:hyd-lyjpro:1",
|
||||
"description": "Light",
|
||||
"properties": [
|
||||
{
|
||||
"iid": 1,
|
||||
"type": "urn:miot-spec-v2:property:on:00000006:hyd-lyjpro:1",
|
||||
"description": "Sunlight",
|
||||
"format": "bool",
|
||||
"access": [
|
||||
"read",
|
||||
"write",
|
||||
"notify"
|
||||
]
|
||||
},
|
||||
{
|
||||
"iid": 3,
|
||||
"type": "urn:miot-spec-v2:property:flex-switch:000000EC:hyd-lyjpro:1",
|
||||
"description": "Flex Switch",
|
||||
"format": "uint8",
|
||||
"access": [
|
||||
"read",
|
||||
"write",
|
||||
"notify"
|
||||
],
|
||||
"value-list": [
|
||||
{
|
||||
"value": 1,
|
||||
"description": "Overturn"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"new": [
|
||||
{
|
||||
"iid": 3,
|
||||
"type": "urn:miot-spec-v2:service:light:00007802:hyd-lyjpro:1",
|
||||
"description": "Moonlight",
|
||||
"properties": [
|
||||
{
|
||||
"iid": 2,
|
||||
"type": "urn:miot-spec-v2:property:on:00000006:hyd-lyjpro:1",
|
||||
"description": "Switch Status",
|
||||
"format": "bool",
|
||||
"access": [
|
||||
"read",
|
||||
"write",
|
||||
"notify"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@ -155,7 +155,7 @@
|
||||
"service:004:property:001": "事件名稱"
|
||||
}
|
||||
},
|
||||
"urn:miot-spec-v2:device:switch:0000A003:lumi-acn040:1": {
|
||||
"urn:miot-spec-v2:device:switch:0000A003:lumi-acn040": {
|
||||
"en": {
|
||||
"service:011": "Right Button On and Off",
|
||||
"service:011:property:001": "Right Button On and Off",
|
||||
|
||||
@ -20,6 +20,14 @@ SPEC_MULTI_LANG_FILE = path.join(
|
||||
SPEC_FILTER_FILE = path.join(
|
||||
ROOT_PATH,
|
||||
'../custom_components/xiaomi_home/miot/specs/spec_filter.json')
|
||||
CUSTOM_SERVICE_FILE = path.join(
|
||||
ROOT_PATH,
|
||||
'../custom_components/xiaomi_home/miot/specs/custom_service.json')
|
||||
|
||||
BOOL_TRANS_URN_KEY_COLON_NUM: int = 4
|
||||
CUSTOM_SERVICE_URN_KEY_COLON_NUM: int = 5
|
||||
MULTI_LANG_URN_KEY_COLON_NUM: int = 5
|
||||
SPEC_FILTER_URN_KEY_COLON_NUM: int = 5
|
||||
|
||||
|
||||
def load_json_file(file_path: str) -> Optional[dict]:
|
||||
@ -90,11 +98,18 @@ def nested_3_dict_str_str(d: dict) -> bool:
|
||||
return False
|
||||
return True
|
||||
|
||||
def urn_key(d: dict, cnt: int) -> bool:
|
||||
for k in d.keys():
|
||||
if cnt != k.count(':'):
|
||||
return False
|
||||
return True
|
||||
|
||||
def spec_filter(d: dict) -> bool:
|
||||
"""restricted format: dict[str, dict[str, list<str>]]"""
|
||||
if not dict_str_dict(d):
|
||||
return False
|
||||
if not urn_key(d, SPEC_FILTER_URN_KEY_COLON_NUM):
|
||||
return False
|
||||
for value in d.values():
|
||||
for k, v in value.items():
|
||||
if not isinstance(k, str) or not isinstance(v, list):
|
||||
@ -104,12 +119,121 @@ def spec_filter(d: dict) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def spec_instance_format(d: dict) -> bool:
|
||||
"""restricted format of MIoT-Spec-V2 instance"""
|
||||
if ('iid' not in d) or ('type' not in d) or ('description' not in d):
|
||||
return False
|
||||
if not isinstance(d['iid'], int) or not isinstance(d['type'], str) or (
|
||||
not isinstance(d['description'], str)):
|
||||
return False
|
||||
# optional keys for property
|
||||
if 'format' in d:
|
||||
if not isinstance(d['format'], str):
|
||||
return False
|
||||
if 'unit' in d:
|
||||
if not isinstance(d['unit'], str):
|
||||
return False
|
||||
if 'access' in d:
|
||||
if not isinstance(d['access'], list):
|
||||
return False
|
||||
for i in d['access']:
|
||||
if not isinstance(i, str):
|
||||
return False
|
||||
if 'value-list' in d:
|
||||
if not isinstance(d['value-list'], list):
|
||||
return False
|
||||
for i in d['value-list']:
|
||||
if not isinstance(i, dict):
|
||||
return False
|
||||
if 'value' not in i or 'description' not in i:
|
||||
return False
|
||||
if not isinstance(i['value'], int) or not isinstance(i[
|
||||
'description'], str):
|
||||
return False
|
||||
if i['description'].replace(' ','') == '':
|
||||
return False
|
||||
# optional keys for action
|
||||
if 'in' in d:
|
||||
if not isinstance(d['in'], list):
|
||||
return False
|
||||
for i in d['in']:
|
||||
if not isinstance(i, int):
|
||||
return False
|
||||
if 'out' in d:
|
||||
if not isinstance(d['out'], list):
|
||||
return False
|
||||
for i in d['out']:
|
||||
if not isinstance(i, int):
|
||||
return False
|
||||
# optional keys for event
|
||||
if 'arguments' in d:
|
||||
if not isinstance(d['arguments'], list):
|
||||
return False
|
||||
for i in d['arguments']:
|
||||
if not isinstance(i, int):
|
||||
return False
|
||||
# optional keys for service
|
||||
if 'properties' in d:
|
||||
if not isinstance(d['properties'], list):
|
||||
return False
|
||||
for i in d['properties']:
|
||||
if not spec_instance_format(i):
|
||||
return False
|
||||
if 'actions' in d:
|
||||
if not isinstance(d['actions'], list):
|
||||
return False
|
||||
for i in d['actions']:
|
||||
if not spec_instance_format(i):
|
||||
return False
|
||||
if 'events' in d:
|
||||
if not isinstance(d['events'], list):
|
||||
return False
|
||||
for i in d['events']:
|
||||
if not spec_instance_format(i):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_integer(s: str) -> bool:
|
||||
try:
|
||||
int(s)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def custom_service(d: dict) -> bool:
|
||||
"""restricted format: dict[str, dict[str, Any]]"""
|
||||
if not dict_str_dict(d):
|
||||
return False
|
||||
if not urn_key(d, CUSTOM_SERVICE_URN_KEY_COLON_NUM):
|
||||
return False
|
||||
for v in d.values():
|
||||
for key, value in v.items():
|
||||
if key=='new':
|
||||
if not isinstance(value, list):
|
||||
return False
|
||||
for i in value:
|
||||
if not spec_instance_format(i):
|
||||
return False
|
||||
elif is_integer(key):
|
||||
if not isinstance(value, dict):
|
||||
return False
|
||||
if not spec_instance_format(value):
|
||||
return False
|
||||
else:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def bool_trans(d: dict) -> bool:
|
||||
"""dict[str, dict[str, str] | dict[str, dict[str, str]] ]"""
|
||||
if not isinstance(d, dict):
|
||||
return False
|
||||
if 'data' not in d or 'translate' not in d:
|
||||
return False
|
||||
if not urn_key(d['data'], BOOL_TRANS_URN_KEY_COLON_NUM):
|
||||
return False
|
||||
if not dict_str_str(d['data']):
|
||||
return False
|
||||
if not nested_3_dict_str_str(d['translate']):
|
||||
@ -126,6 +250,13 @@ def bool_trans(d: dict) -> bool:
|
||||
return False
|
||||
return True
|
||||
|
||||
def multi_lang(d: dict) -> bool:
|
||||
"""dict[str, dict[str, dict[str, dict[str, str]] ] ]"""
|
||||
if not nested_3_dict_str_str(d):
|
||||
return False
|
||||
if not urn_key(d, MULTI_LANG_URN_KEY_COLON_NUM):
|
||||
return False
|
||||
return True
|
||||
|
||||
def compare_dict_structure(dict1: dict, dict2: dict) -> bool:
|
||||
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
|
||||
@ -160,13 +291,13 @@ def sort_bool_trans(file_path: str):
|
||||
|
||||
|
||||
def sort_multi_lang(file_path: str):
|
||||
multi_lang: dict = load_json_file(file_path=file_path)
|
||||
multi_lang = dict(sorted(multi_lang.items()))
|
||||
for urn, trans in multi_lang.items():
|
||||
multi_lang[urn] = dict(sorted(trans.items()))
|
||||
for lang, spec in multi_lang[urn].items():
|
||||
multi_lang[urn][lang] = dict(sorted(spec.items()))
|
||||
return multi_lang
|
||||
lang_data: dict = load_json_file(file_path=file_path)
|
||||
lang_data = dict(sorted(lang_data.items()))
|
||||
for urn, trans in lang_data.items():
|
||||
lang_data[urn] = dict(sorted(trans.items()))
|
||||
for lang, spec in lang_data[urn].items():
|
||||
lang_data[urn][lang] = dict(sorted(spec.items()))
|
||||
return lang_data
|
||||
|
||||
|
||||
def sort_spec_filter(file_path: str):
|
||||
@ -177,6 +308,14 @@ def sort_spec_filter(file_path: str):
|
||||
return filter_data
|
||||
|
||||
|
||||
def sort_custom_service(file_path: str):
|
||||
service_data: dict = load_json_file(file_path=file_path)
|
||||
service_data = dict(sorted(service_data.items()))
|
||||
for urn, spec in service_data.items():
|
||||
service_data[urn] = dict(sorted(spec.items()))
|
||||
return service_data
|
||||
|
||||
|
||||
@pytest.mark.github
|
||||
def test_bool_trans():
|
||||
data: dict = load_json_file(SPEC_BOOL_TRANS_FILE)
|
||||
@ -195,7 +334,14 @@ def test_spec_filter():
|
||||
def test_multi_lang():
|
||||
data: dict = load_json_file(SPEC_MULTI_LANG_FILE)
|
||||
assert data, f'load {SPEC_MULTI_LANG_FILE} failed'
|
||||
assert nested_3_dict_str_str(data), f'{SPEC_MULTI_LANG_FILE} format error'
|
||||
assert multi_lang(data), f'{SPEC_MULTI_LANG_FILE} format error'
|
||||
|
||||
|
||||
@pytest.mark.github
|
||||
def test_custom_service():
|
||||
data: dict = load_json_file(CUSTOM_SERVICE_FILE)
|
||||
assert data, f'load {CUSTOM_SERVICE_FILE} failed'
|
||||
assert custom_service(data), f'{CUSTOM_SERVICE_FILE} format error'
|
||||
|
||||
|
||||
@pytest.mark.github
|
||||
@ -278,6 +424,12 @@ def test_miot_data_sort():
|
||||
f'{SPEC_FILTER_FILE} not sorted, goto project root path'
|
||||
' and run the following command sorting, ',
|
||||
'pytest -s -v -m update ./test/check_rule_format.py')
|
||||
assert json.dumps(
|
||||
load_json_file(file_path=CUSTOM_SERVICE_FILE)) == json.dumps(
|
||||
sort_custom_service(file_path=CUSTOM_SERVICE_FILE)), (
|
||||
f'{CUSTOM_SERVICE_FILE} not sorted, goto project root path'
|
||||
' and run the following command sorting, ',
|
||||
'pytest -s -v -m update ./test/check_rule_format.py')
|
||||
|
||||
|
||||
@pytest.mark.update
|
||||
@ -291,3 +443,6 @@ def test_sort_spec_data():
|
||||
sort_data = sort_spec_filter(file_path=SPEC_FILTER_FILE)
|
||||
save_json_file(file_path=SPEC_FILTER_FILE, data=sort_data)
|
||||
print(SPEC_FILTER_FILE, 'formatted.')
|
||||
sort_data = sort_custom_service(file_path=CUSTOM_SERVICE_FILE)
|
||||
save_json_file(file_path=CUSTOM_SERVICE_FILE, data=sort_data)
|
||||
print(CUSTOM_SERVICE_FILE, 'formatted.')
|
||||
|
||||
Loading…
Reference in New Issue
Block a user