This commit is contained in:
Paul Shawn 2025-01-07 12:25:24 +00:00 committed by GitHub
commit 7fd4b8017b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 447 additions and 329 deletions

View File

@ -45,11 +45,14 @@ off Xiaomi or its affiliates' products.
Common utilities.
"""
import asyncio
import json
from os import path
import random
from typing import Any, Optional
import hashlib
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from paho.mqtt.matcher import MQTTMatcher
import yaml
@ -83,10 +86,12 @@ def randomize_int(value: int, ratio: float) -> int:
"""Randomize an integer value."""
return int(value * (1 - ratio + random.random()*2*ratio))
def randomize_float(value: float, ratio: float) -> float:
"""Randomize a float value."""
return value * (1 - ratio + random.random()*2*ratio)
class MIoTMatcher(MQTTMatcher):
"""MIoT Pub/Sub topic matcher."""
@ -105,3 +110,68 @@ class MIoTMatcher(MQTTMatcher):
return self[topic]
except KeyError:
return None
class MIoTHttp:
"""MIoT Common HTTP API."""
@staticmethod
def get(
url: str, params: Optional[dict] = None, headers: Optional[dict] = None
) -> Optional[str]:
full_url = url
if params:
encoded_params = urlencode(params)
full_url = f'{url}?{encoded_params}'
request = Request(full_url, method='GET', headers=headers or {})
content: Optional[bytes] = None
with urlopen(request) as response:
content = response.read()
return str(content, 'utf-8') if content else None
@staticmethod
def get_json(
url: str, params: Optional[dict] = None, headers: Optional[dict] = None
) -> Optional[dict]:
response = MIoTHttp.get(url, params, headers)
return json.loads(response) if response else None
@staticmethod
def post(
url: str, data: Optional[dict] = None, headers: Optional[dict] = None
) -> Optional[str]:
pass
@staticmethod
def post_json(
url: str, data: Optional[dict] = None, headers: Optional[dict] = None
) -> Optional[dict]:
response = MIoTHttp.post(url, data, headers)
return json.loads(response) if response else None
@staticmethod
async def get_async(
url: str, params: Optional[dict] = None, headers: Optional[dict] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> Optional[str]:
# TODO: Use aiohttp
ev_loop = loop or asyncio.get_running_loop()
return await ev_loop.run_in_executor(
None, MIoTHttp.get, url, params, headers)
@staticmethod
async def get_json_async(
url: str, params: Optional[dict] = None, headers: Optional[dict] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> Optional[dict]:
ev_loop = loop or asyncio.get_running_loop()
return await ev_loop.run_in_executor(
None, MIoTHttp.get_json, url, params, headers)
@ staticmethod
async def post_async(
url: str, data: Optional[dict] = None, headers: Optional[dict] = None,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> Optional[str]:
ev_loop = loop or asyncio.get_running_loop()
return await ev_loop.run_in_executor(
None, MIoTHttp.post, url, data, headers)

View File

@ -142,7 +142,7 @@ class MIoTDevice:
_room_id: str
_room_name: str
_suggested_area: str
_suggested_area: Optional[str]
_device_state_sub_list: dict[str, Callable[[str, MIoTDeviceState], None]]
@ -153,7 +153,7 @@ class MIoTDevice:
def __init__(
self, miot_client: MIoTClient,
device_info: dict[str, str],
device_info: dict[str, Any],
spec_instance: MIoTSpecInstance
) -> None:
self.miot_client = miot_client
@ -243,25 +243,29 @@ class MIoTDevice:
return True
def sub_property(
self, handler: Callable[[dict, Any], None], siid: int = None,
piid: int = None, handler_ctx: Any = None
self, handler: Callable[[dict, Any], None], siid: Optional[int] = None,
piid: Optional[int] = None, handler_ctx: Any = None
) -> bool:
return self.miot_client.sub_prop(
did=self._did, handler=handler, siid=siid, piid=piid,
handler_ctx=handler_ctx)
def unsub_property(self, siid: int = None, piid: int = None) -> bool:
def unsub_property(
self, siid: Optional[int] = None, piid: Optional[int] = None
) -> bool:
return self.miot_client.unsub_prop(did=self._did, siid=siid, piid=piid)
def sub_event(
self, handler: Callable[[dict, Any], None], siid: int = None,
eiid: int = None, handler_ctx: Any = None
self, handler: Callable[[dict, Any], None], siid: Optional[int] = None,
eiid: Optional[int] = None, handler_ctx: Any = None
) -> bool:
return self.miot_client.sub_event(
did=self._did, handler=handler, siid=siid, eiid=eiid,
handler_ctx=handler_ctx)
def unsub_event(self, siid: int = None, eiid: int = None) -> bool:
def unsub_event(
self, siid: Optional[int] = None, eiid: Optional[int] = None
) -> bool:
return self.miot_client.unsub_event(
did=self._did, siid=siid, eiid=eiid)
@ -703,7 +707,7 @@ class MIoTDevice:
def __on_device_state_changed(
self, did: str, state: MIoTDeviceState, ctx: Any
) -> None:
self._online = state
self._online = state == MIoTDeviceState.ONLINE
for key, handler in self._device_state_sub_list.items():
self.miot_client.main_loop.call_soon_threadsafe(
handler, key, state)
@ -719,7 +723,8 @@ class MIoTServiceEntity(Entity):
_main_loop: asyncio.AbstractEventLoop
_prop_value_map: dict[MIoTSpecProperty, Any]
_event_occurred_handler: Callable[[MIoTSpecEvent, dict], None]
_event_occurred_handler: Optional[
Callable[[MIoTSpecEvent, dict], None]]
_prop_changed_subs: dict[
MIoTSpecProperty, Callable[[MIoTSpecProperty, Any], None]]
@ -763,7 +768,9 @@ class MIoTServiceEntity(Entity):
self.entity_id)
@property
def event_occurred_handler(self) -> Callable[[MIoTSpecEvent, dict], None]:
def event_occurred_handler(
self
) -> Optional[Callable[[MIoTSpecEvent, dict], None]]:
return self._event_occurred_handler
@event_occurred_handler.setter
@ -784,7 +791,7 @@ class MIoTServiceEntity(Entity):
self._prop_changed_subs.pop(prop, None)
@property
def device_info(self) -> dict:
def device_info(self) -> Optional[DeviceInfo]:
return self.miot_device.device_info
async def async_added_to_hass(self) -> None:
@ -1002,7 +1009,7 @@ class MIoTPropertyEntity(Entity):
# {'min':int, 'max':int, 'step': int}
_value_range: dict[str, int]
# {Any: Any}
_value_list: dict[Any, Any]
_value_list: Optional[dict[Any, Any]]
_value: Any
_pending_write_ha_state_timer: Optional[asyncio.TimerHandle]
@ -1042,7 +1049,7 @@ class MIoTPropertyEntity(Entity):
self._value_list)
@property
def device_info(self) -> dict:
def device_info(self) -> Optional[DeviceInfo]:
return self.miot_device.device_info
async def async_added_to_hass(self) -> None:
@ -1067,7 +1074,7 @@ class MIoTPropertyEntity(Entity):
self.miot_device.unsub_property(
siid=self.service.iid, piid=self.spec.iid)
def get_vlist_description(self, value: Any) -> str:
def get_vlist_description(self, value: Any) -> Optional[str]:
if not self._value_list:
return None
return self._value_list.get(value, None)
@ -1184,7 +1191,7 @@ class MIoTEventEntity(Entity):
spec.device_class, self.entity_id)
@property
def device_info(self) -> dict:
def device_info(self) -> Optional[DeviceInfo]:
return self.miot_device.device_info
async def async_added_to_hass(self) -> None:
@ -1286,7 +1293,7 @@ class MIoTActionEntity(Entity):
spec.device_class, self.entity_id)
@property
def device_info(self) -> dict:
def device_info(self) -> Optional[DeviceInfo]:
return self.miot_device.device_info
async def async_added_to_hass(self) -> None:
@ -1298,7 +1305,9 @@ class MIoTActionEntity(Entity):
self.miot_device.unsub_device_state(
key=f'{self.action_platform}.{ self.service.iid}.{self.spec.iid}')
async def action_async(self, in_list: list = None) -> Optional[list]:
async def action_async(
self, in_list: Optional[list] = None
) -> Optional[list]:
try:
return await self.miot_device.miot_client.action_async(
did=self.miot_device.did,

View File

@ -54,8 +54,10 @@ from urllib.parse import urlencode
from urllib.request import Request, urlopen
import logging
# pylint: disable=relative-beyond-top-level
from .const import DEFAULT_INTEGRATION_LANGUAGE, SPEC_STD_LIB_EFFECTIVE_TIME
from .common import MIoTHttp
from .miot_error import MIoTSpecError
from .miot_storage import (
MIoTStorage,
@ -66,6 +68,291 @@ from .miot_storage import (
_LOGGER = logging.getLogger(__name__)
class _MIoTSpecValueRange:
"""MIoT SPEC value range class."""
min_: int
max_: int
step: int
def from_list(self, value_range: list) -> None:
self.min_ = value_range[0]
self.max_ = value_range[1]
self.step = value_range[2]
def to_list(self) -> list:
return [self.min_, self.max_, self.step]
class _MIoTSpecValueListItem:
"""MIoT SPEC value list item class."""
# All lower-case SPEC description.
name: str
# Value
value: Any
# Descriptions after multilingual conversion.
description: str
def to_dict(self) -> dict:
return {
'name': self.name,
'value': self.value,
'description': self.description
}
class _MIoTSpecValueList:
"""MIoT SPEC value list class."""
items: list[_MIoTSpecValueListItem]
def to_map(self) -> dict:
return {item.value: item.description for item in self.items}
def to_list(self) -> list:
return [item.to_dict() for item in self.items]
class _SpecStdLib:
"""MIoT-Spec-V2 standard library."""
_lang: str
_devices: dict[str, dict[str, str]]
_services: dict[str, dict[str, str]]
_properties: dict[str, dict[str, str]]
_events: dict[str, dict[str, str]]
_actions: dict[str, dict[str, str]]
_values: dict[str, dict[str, str]]
def __init__(self, lang: str) -> None:
self._lang = lang
self._devices = {}
self._services = {}
self._properties = {}
self._events = {}
self._actions = {}
self._values = {}
self._spec_std_lib = None
def from_dict(self, std_lib: dict[str, dict[str, dict[str, str]]]) -> None:
if (
not isinstance(std_lib, dict)
or 'devices' not in std_lib
or 'services' not in std_lib
or 'properties' not in std_lib
or 'events' not in std_lib
or 'actions' not in std_lib
or 'values' not in std_lib
):
return
self._devices = std_lib['devices']
self._services = std_lib['services']
self._properties = std_lib['properties']
self._events = std_lib['events']
self._actions = std_lib['actions']
self._values = std_lib['values']
def device_translate(self, key: str) -> Optional[str]:
if not self._devices or key not in self._devices:
return None
if self._lang not in self._devices[key]:
return self._devices[key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._devices[key][self._lang]
def service_translate(self, key: str) -> Optional[str]:
if not self._services or key not in self._services:
return None
if self._lang not in self._services[key]:
return self._services[key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._services[key][self._lang]
def property_translate(self, key: str) -> Optional[str]:
if not self._properties or key not in self._properties:
return None
if self._lang not in self._properties[key]:
return self._properties[key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._properties[key][self._lang]
def event_translate(self, key: str) -> Optional[str]:
if not self._events or key not in self._events:
return None
if self._lang not in self._events[key]:
return self._events[key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._events[key][self._lang]
def action_translate(self, key: str) -> Optional[str]:
if not self._actions or key not in self._actions:
return None
if self._lang not in self._actions[key]:
return self._actions[key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._actions[key][self._lang]
def value_translate(self, key: str) -> Optional[str]:
if not self._values or key not in self._values:
return None
if self._lang not in self._values[key]:
return self._values[key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._values[key][self._lang]
def dump(self) -> dict[str, dict[str, dict[str, str]]]:
return {
'devices': self._devices,
'services': self._services,
'properties': self._properties,
'events': self._events,
'actions': self._actions,
'values': self._values
}
async def refresh_async(self) -> bool:
std_lib_new = await self.__request_from_cloud_async()
if std_lib_new:
self.from_dict(std_lib_new)
return True
return False
async def __request_from_cloud_async(self) -> Optional[dict]:
std_libs: Optional[dict] = None
for index in range(3):
try:
tasks: list = []
# Get std lib
for name in [
'device', 'service', 'property', 'event', 'action']:
tasks.append(self.__get_template_list(
'https://miot-spec.org/miot-spec-v2/template/list/'
+ name))
tasks.append(self.__get_property_value())
# Async request
results = await asyncio.gather(*tasks)
if None in results:
raise MIoTSpecError('init failed, None in result')
std_libs = {
'devices': results[0],
'services': results[1],
'properties': results[2],
'events': results[3],
'actions': results[4],
'values': results[5],
}
# Get external std lib, Power by LM
tasks.clear()
for name in [
'device', 'service', 'property', 'event', 'action',
'property_value']:
tasks.append(MIoTHttp.get_json_async(
'https://cdn.cnbj1.fds.api.mi-img.com/res-conf/'
f'xiaomi-home/std_ex_{name}.json'))
results = await asyncio.gather(*tasks)
if results[0]:
for key, value in results[0].items():
if key in std_libs['devices']:
std_libs['devices'][key].update(value)
else:
std_libs['devices'][key] = value
else:
_LOGGER.error('get external std lib failed, devices')
if results[1]:
for key, value in results[1].items():
if key in std_libs['services']:
std_libs['services'][key].update(value)
else:
std_libs['services'][key] = value
else:
_LOGGER.error('get external std lib failed, services')
if results[2]:
for key, value in results[2].items():
if key in std_libs['properties']:
std_libs['properties'][key].update(value)
else:
std_libs['properties'][key] = value
else:
_LOGGER.error('get external std lib failed, properties')
if results[3]:
for key, value in results[3].items():
if key in std_libs['events']:
std_libs['events'][key].update(value)
else:
std_libs['events'][key] = value
else:
_LOGGER.error('get external std lib failed, events')
if results[4]:
for key, value in results[4].items():
if key in std_libs['actions']:
std_libs['actions'][key].update(value)
else:
std_libs['actions'][key] = value
else:
_LOGGER.error('get external std lib failed, actions')
if results[5]:
for key, value in results[5].items():
if key in std_libs['values']:
std_libs['values'][key].update(value)
else:
std_libs['values'][key] = value
else:
_LOGGER.error(
'get external std lib failed, values')
return std_libs
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error(
'update spec std lib error, retry, %d, %s', index, err)
return None
async def __get_property_value(self) -> dict:
reply = await MIoTHttp.get_json_async(
url='https://miot-spec.org/miot-spec-v2'
'/normalization/list/property_value')
if reply is None or 'result' not in reply:
raise MIoTSpecError('get property value failed')
result = {}
for item in reply['result']:
if (
not isinstance(item, dict)
or 'normalization' not in item
or 'description' not in item
or 'proName' not in item
or 'urn' not in item
):
continue
result[
f'{item["urn"]}|{item["proName"]}|{item["normalization"]}'
] = {
'zh-Hans': item['description'],
'en': item['normalization']
}
return result
async def __get_template_list(self, url: str) -> dict:
reply = await MIoTHttp.get_json_async(url=url)
if reply is None or 'result' not in reply:
raise MIoTSpecError(f'get service failed, {url}')
result: dict = {}
for item in reply['result']:
if (
not isinstance(item, dict)
or 'type' not in item
or 'description' not in item
):
continue
if 'zh_cn' in item['description']:
item['description']['zh-Hans'] = item['description'].pop(
'zh_cn')
if 'zh_hk' in item['description']:
item['description']['zh-Hant'] = item['description'].pop(
'zh_hk')
item['description'].pop('zh_tw', None)
elif 'zh_tw' in item['description']:
item['description']['zh-Hant'] = item['description'].pop(
'zh_tw')
result[item['type']] = item['description']
return result
class MIoTSpecBase:
"""MIoT SPEC base class."""
iid: int
@ -77,13 +364,13 @@ class MIoTSpecBase:
name: Optional[str]
# External params
platform: str
platform: Optional[str]
device_class: Any
state_class: Any
icon: str
icon: Optional[str]
external_unit: Any
spec_id: str
spec_id: int
def __init__(self, spec: dict) -> None:
self.iid = spec['iid']
@ -106,7 +393,7 @@ class MIoTSpecBase:
def __hash__(self) -> int:
return self.spec_id
def __eq__(self, value: object) -> bool:
def __eq__(self, value) -> bool:
return self.spec_id == value.spec_id
@ -114,10 +401,10 @@ class MIoTSpecProperty(MIoTSpecBase):
"""MIoT SPEC property class."""
format_: str
precision: int
unit: str
unit: Optional[str]
value_range: list
value_list: list[dict]
value_range: Optional[list]
value_list: Optional[list[dict]]
_access: list
_writable: bool
@ -127,10 +414,9 @@ class MIoTSpecProperty(MIoTSpecBase):
service: MIoTSpecBase
def __init__(
self, spec: dict, service: MIoTSpecBase = None,
format_: str = None, access: list = None,
unit: str = None, value_range: list = None,
value_list: list[dict] = None, precision: int = 0
self, spec: dict, service: MIoTSpecBase, format_: str, access: list,
unit: Optional[str] = None, value_range: Optional[list] = None,
value_list: Optional[list[dict]] = None, precision: int = 0
) -> None:
super().__init__(spec=spec)
self.service = service
@ -203,7 +489,7 @@ class MIoTSpecEvent(MIoTSpecBase):
service: MIoTSpecBase
def __init__(
self, spec: dict, service: MIoTSpecBase = None,
self, spec: dict, service: MIoTSpecBase,
argument: list[MIoTSpecProperty] = None
) -> None:
super().__init__(spec=spec)
@ -372,86 +658,6 @@ class MIoTSpecInstance:
}
class SpecStdLib:
"""MIoT-Spec-V2 standard library."""
_lang: str
_spec_std_lib: Optional[dict[str, dict[str, dict[str, str]]]]
def __init__(self, lang: str) -> None:
self._lang = lang
self._spec_std_lib = None
def init(self, std_lib: dict[str, dict[str, str]]) -> None:
if (
not isinstance(std_lib, dict)
or 'devices' not in std_lib
or 'services' not in std_lib
or 'properties' not in std_lib
or 'events' not in std_lib
or 'actions' not in std_lib
or 'values' not in std_lib
):
return
self._spec_std_lib = std_lib
def deinit(self) -> None:
self._spec_std_lib = None
def device_translate(self, key: str) -> Optional[str]:
if not self._spec_std_lib or key not in self._spec_std_lib['devices']:
return None
if self._lang not in self._spec_std_lib['devices'][key]:
return self._spec_std_lib['devices'][key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._spec_std_lib['devices'][key][self._lang]
def service_translate(self, key: str) -> Optional[str]:
if not self._spec_std_lib or key not in self._spec_std_lib['services']:
return None
if self._lang not in self._spec_std_lib['services'][key]:
return self._spec_std_lib['services'][key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._spec_std_lib['services'][key][self._lang]
def property_translate(self, key: str) -> Optional[str]:
if (
not self._spec_std_lib
or key not in self._spec_std_lib['properties']
):
return None
if self._lang not in self._spec_std_lib['properties'][key]:
return self._spec_std_lib['properties'][key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._spec_std_lib['properties'][key][self._lang]
def event_translate(self, key: str) -> Optional[str]:
if not self._spec_std_lib or key not in self._spec_std_lib['events']:
return None
if self._lang not in self._spec_std_lib['events'][key]:
return self._spec_std_lib['events'][key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._spec_std_lib['events'][key][self._lang]
def action_translate(self, key: str) -> Optional[str]:
if not self._spec_std_lib or key not in self._spec_std_lib['actions']:
return None
if self._lang not in self._spec_std_lib['actions'][key]:
return self._spec_std_lib['actions'][key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._spec_std_lib['actions'][key][self._lang]
def value_translate(self, key: str) -> Optional[str]:
if not self._spec_std_lib or key not in self._spec_std_lib['values']:
return None
if self._lang not in self._spec_std_lib['values'][key]:
return self._spec_std_lib['values'][key].get(
DEFAULT_INTEGRATION_LANGUAGE, None)
return self._spec_std_lib['values'][key][self._lang]
def dump(self) -> dict[str, dict[str, str]]:
return self._spec_std_lib
class MIoTSpecParser:
"""MIoT SPEC parser."""
# pylint: disable=inconsistent-quotes
@ -464,24 +670,24 @@ class MIoTSpecParser:
_init_done: bool
_ram_cache: dict
_std_lib: SpecStdLib
_std_lib: _SpecStdLib
_bool_trans: SpecBoolTranslation
_multi_lang: SpecMultiLang
_spec_filter: SpecFilter
def __init__(
self, lang: str = DEFAULT_INTEGRATION_LANGUAGE,
storage: MIoTStorage = None,
self, lang: Optional[str],
storage: MIoTStorage,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
self._lang = lang
self._lang = lang or DEFAULT_INTEGRATION_LANGUAGE
self._storage = storage
self._main_loop = loop or asyncio.get_running_loop()
self._init_done = False
self._ram_cache = {}
self._std_lib = SpecStdLib(lang=self._lang)
self._std_lib = _SpecStdLib(lang=self._lang)
self._bool_trans = SpecBoolTranslation(
lang=self._lang, loop=self._main_loop)
self._multi_lang = SpecMultiLang(lang=self._lang, loop=self._main_loop)
@ -493,48 +699,43 @@ class MIoTSpecParser:
await self._bool_trans.init_async()
await self._multi_lang.init_async()
await self._spec_filter.init_async()
std_lib_cache: dict = None
if self._storage:
std_lib_cache: dict = await self._storage.load_async(
domain=self.DOMAIN, name='spec_std_lib', type_=dict)
if (
isinstance(std_lib_cache, dict)
and 'data' in std_lib_cache
and 'ts' in std_lib_cache
and isinstance(std_lib_cache['ts'], int)
and int(time.time()) - std_lib_cache['ts'] <
SPEC_STD_LIB_EFFECTIVE_TIME
):
# Use the cache if the update time is less than 14 day
_LOGGER.debug(
'use local spec std cache, ts->%s', std_lib_cache['ts'])
self._std_lib.init(std_lib_cache['data'])
self._init_done = True
return
std_lib_cache = await self._storage.load_async(
domain=self.DOMAIN, name='spec_std_lib', type_=dict)
if (
isinstance(std_lib_cache, dict)
and 'data' in std_lib_cache
and 'ts' in std_lib_cache
and isinstance(std_lib_cache['ts'], int)
and int(time.time()) - std_lib_cache['ts'] <
SPEC_STD_LIB_EFFECTIVE_TIME
):
# Use the cache if the update time is less than 14 day
_LOGGER.debug(
'use local spec std cache, ts->%s', std_lib_cache['ts'])
self._std_lib.from_dict(std_lib_cache['data'])
self._init_done = True
return
# Update spec std lib
spec_lib_new = await self.__request_spec_std_lib_async()
if spec_lib_new:
self._std_lib.init(spec_lib_new)
if self._storage:
if not await self._storage.save_async(
domain=self.DOMAIN, name='spec_std_lib',
data={
'data': self._std_lib.dump(),
'ts': int(time.time())
}
):
_LOGGER.error('save spec std lib failed')
if await self._std_lib.refresh_async():
if not await self._storage.save_async(
domain=self.DOMAIN, name='spec_std_lib',
data={
'data': self._std_lib.dump(),
'ts': int(time.time())
}
):
_LOGGER.error('save spec std lib failed')
else:
if std_lib_cache:
self._std_lib.init(std_lib_cache['data'])
_LOGGER.error('get spec std lib failed, use local cache')
if isinstance(std_lib_cache, dict) and 'data' in std_lib_cache:
self._std_lib.from_dict(std_lib_cache['data'])
_LOGGER.info('get spec std lib failed, use local cache')
else:
_LOGGER.error('get spec std lib failed')
_LOGGER.error('load spec std lib failed')
self._init_done = True
async def deinit_async(self) -> None:
self._init_done = False
self._std_lib.deinit()
# self._std_lib.deinit()
await self._bool_trans.deinit_async()
await self._multi_lang.deinit_async()
await self._spec_filter.deinit_async()
@ -562,18 +763,15 @@ class MIoTSpecParser:
"""MUST await init first !!!"""
if not urn_list:
return False
spec_std_new: dict = await self.__request_spec_std_lib_async()
if spec_std_new:
self._std_lib.init(spec_std_new)
if self._storage:
if not await self._storage.save_async(
domain=self.DOMAIN, name='spec_std_lib',
data={
'data': self._std_lib.dump(),
'ts': int(time.time())
}
):
_LOGGER.error('save spec std lib failed')
if await self._std_lib.refresh_async():
if not await self._storage.save_async(
domain=self.DOMAIN, name='spec_std_lib',
data={
'data': self._std_lib.dump(),
'ts': int(time.time())
}
):
_LOGGER.error('save spec std lib failed')
else:
raise MIoTSpecError('get spec std lib failed')
success_count = 0
@ -585,28 +783,6 @@ class MIoTSpecParser:
success_count += sum(1 for result in results if result is not None)
return success_count
def __http_get(
self, url: str, params: dict = None, headers: dict = None
) -> dict:
if params:
encoded_params = urlencode(params)
full_url = f'{url}?{encoded_params}'
else:
full_url = url
request = Request(full_url, method='GET', headers=headers or {})
content: bytes = None
with urlopen(request) as response:
content = response.read()
return (
json.loads(str(content, 'utf-8'))
if content is not None else None)
async def __http_get_async(
self, url: str, params: dict = None, headers: dict = None
) -> dict:
return await self._main_loop.run_in_executor(
None, self.__http_get, url, params, headers)
async def __cache_get(self, urn: str) -> Optional[dict]:
if self._storage is not None:
if platform.system() == 'Windows':
@ -630,157 +806,20 @@ class MIoTSpecParser:
return {'string': 'str', 'bool': 'bool', 'float': 'float'}.get(
format_, 'int')
async def __request_spec_std_lib_async(self) -> Optional[SpecStdLib]:
std_libs: dict = None
for index in range(3):
try:
tasks: list = []
# Get std lib
for name in [
'device', 'service', 'property', 'event', 'action']:
tasks.append(self.__get_template_list(
'https://miot-spec.org/miot-spec-v2/template/list/'
+ name))
tasks.append(self.__get_property_value())
# Async request
results = await asyncio.gather(*tasks)
if None in results:
raise MIoTSpecError('init failed, None in result')
std_libs = {
'devices': results[0],
'services': results[1],
'properties': results[2],
'events': results[3],
'actions': results[4],
'values': results[5],
}
# Get external std lib, Power by LM
tasks.clear()
for name in [
'device', 'service', 'property', 'event', 'action',
'property_value']:
tasks.append(self.__http_get_async(
'https://cdn.cnbj1.fds.api.mi-img.com/res-conf/'
f'xiaomi-home/std_ex_{name}.json'))
results = await asyncio.gather(*tasks)
if results[0]:
for key, value in results[0].items():
if key in std_libs['devices']:
std_libs['devices'][key].update(value)
else:
std_libs['devices'][key] = value
else:
_LOGGER.error('get external std lib failed, devices')
if results[1]:
for key, value in results[1].items():
if key in std_libs['services']:
std_libs['services'][key].update(value)
else:
std_libs['services'][key] = value
else:
_LOGGER.error('get external std lib failed, services')
if results[2]:
for key, value in results[2].items():
if key in std_libs['properties']:
std_libs['properties'][key].update(value)
else:
std_libs['properties'][key] = value
else:
_LOGGER.error('get external std lib failed, properties')
if results[3]:
for key, value in results[3].items():
if key in std_libs['events']:
std_libs['events'][key].update(value)
else:
std_libs['events'][key] = value
else:
_LOGGER.error('get external std lib failed, events')
if results[4]:
for key, value in results[4].items():
if key in std_libs['actions']:
std_libs['actions'][key].update(value)
else:
std_libs['actions'][key] = value
else:
_LOGGER.error('get external std lib failed, actions')
if results[5]:
for key, value in results[5].items():
if key in std_libs['values']:
std_libs['values'][key].update(value)
else:
std_libs['values'][key] = value
else:
_LOGGER.error(
'get external std lib failed, values')
return std_libs
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error(
'update spec std lib error, retry, %d, %s', index, err)
return None
async def __get_property_value(self) -> dict:
reply = await self.__http_get_async(
url='https://miot-spec.org/miot-spec-v2'
'/normalization/list/property_value')
if reply is None or 'result' not in reply:
raise MIoTSpecError('get property value failed')
result = {}
for item in reply['result']:
if (
not isinstance(item, dict)
or 'normalization' not in item
or 'description' not in item
or 'proName' not in item
or 'urn' not in item
):
continue
result[
f'{item["urn"]}|{item["proName"]}|{item["normalization"]}'
] = {
'zh-Hans': item['description'],
'en': item['normalization']
}
return result
async def __get_template_list(self, url: str) -> dict:
reply = await self.__http_get_async(url=url)
if reply is None or 'result' not in reply:
raise MIoTSpecError(f'get service failed, {url}')
result: dict = {}
for item in reply['result']:
if (
not isinstance(item, dict)
or 'type' not in item
or 'description' not in item
):
continue
if 'zh_cn' in item['description']:
item['description']['zh-Hans'] = item['description'].pop(
'zh_cn')
if 'zh_hk' in item['description']:
item['description']['zh-Hant'] = item['description'].pop(
'zh_hk')
item['description'].pop('zh_tw', None)
elif 'zh_tw' in item['description']:
item['description']['zh-Hant'] = item['description'].pop(
'zh_tw')
result[item['type']] = item['description']
return result
async def __get_instance(self, urn: str) -> dict:
return await self.__http_get_async(
async def __get_instance(self, urn: str) -> Optional[dict]:
return await MIoTHttp.get_json_async(
url='https://miot-spec.org/miot-spec-v2/instance',
params={'type': urn})
async def __get_translation(self, urn: str) -> dict:
return await self.__http_get_async(
async def __get_translation(self, urn: str) -> Optional[dict]:
return await MIoTHttp.get_json_async(
url='https://miot-spec.org/instance/v2/multiLanguage',
params={'urn': urn})
async def __parse(self, urn: str) -> MIoTSpecInstance:
_LOGGER.debug('parse urn, %s', urn)
# Load spec instance
instance: dict = await self.__get_instance(urn=urn)
instance = await self.__get_instance(urn=urn)
if (
not isinstance(instance, dict)
or 'type' not in instance
@ -789,6 +828,8 @@ class MIoTSpecParser:
):
raise MIoTSpecError(f'invalid urn instance, {urn}')
translation: dict = {}
urn_strs: list[str] = urn.split(':')
urn_key: str = ':'.join(urn_strs[:6])
try:
# Load multiple language configuration.
res_trans = await self.__get_translation(urn=urn)
@ -798,9 +839,7 @@ class MIoTSpecParser:
or not isinstance(res_trans['data'], dict)
):
raise MIoTSpecError('invalid translation data')
urn_strs: list[str] = urn.split(':')
urn_key: str = ':'.join(urn_strs[:6])
trans_data: dict[str, str] = None
trans_data: dict[str, str] = {}
if self._lang == 'zh-Hans':
# Simplified Chinese
trans_data = res_trans['data'].get('zh_cn', {})