From eddeafa3c348a653155ea08e6afef44049b992f0 Mon Sep 17 00:00:00 2001 From: topsworld Date: Tue, 7 Jan 2025 10:40:10 +0800 Subject: [PATCH 1/2] fix: fix miot_device type error --- .../xiaomi_home/miot/miot_device.py | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/custom_components/xiaomi_home/miot/miot_device.py b/custom_components/xiaomi_home/miot/miot_device.py index 353b28f..17bf9da 100644 --- a/custom_components/xiaomi_home/miot/miot_device.py +++ b/custom_components/xiaomi_home/miot/miot_device.py @@ -142,7 +142,7 @@ class MIoTDevice: _room_id: str _room_name: str - _suggested_area: str + _suggested_area: Optional[str] _device_state_sub_list: dict[str, Callable[[str, MIoTDeviceState], None]] @@ -153,7 +153,7 @@ class MIoTDevice: def __init__( self, miot_client: MIoTClient, - device_info: dict[str, str], + device_info: dict[str, Any], spec_instance: MIoTSpecInstance ) -> None: self.miot_client = miot_client @@ -243,25 +243,29 @@ class MIoTDevice: return True def sub_property( - self, handler: Callable[[dict, Any], None], siid: int = None, - piid: int = None, handler_ctx: Any = None + self, handler: Callable[[dict, Any], None], siid: Optional[int] = None, + piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: return self.miot_client.sub_prop( did=self._did, handler=handler, siid=siid, piid=piid, handler_ctx=handler_ctx) - def unsub_property(self, siid: int = None, piid: int = None) -> bool: + def unsub_property( + self, siid: Optional[int] = None, piid: Optional[int] = None + ) -> bool: return self.miot_client.unsub_prop(did=self._did, siid=siid, piid=piid) def sub_event( - self, handler: Callable[[dict, Any], None], siid: int = None, - eiid: int = None, handler_ctx: Any = None + self, handler: Callable[[dict, Any], None], siid: Optional[int] = None, + eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: return self.miot_client.sub_event( did=self._did, handler=handler, siid=siid, eiid=eiid, handler_ctx=handler_ctx) - def unsub_event(self, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, siid: Optional[int] = None, eiid: Optional[int] = None + ) -> bool: return self.miot_client.unsub_event( did=self._did, siid=siid, eiid=eiid) @@ -703,7 +707,7 @@ class MIoTDevice: def __on_device_state_changed( self, did: str, state: MIoTDeviceState, ctx: Any ) -> None: - self._online = state + self._online = state == MIoTDeviceState.ONLINE for key, handler in self._device_state_sub_list.items(): self.miot_client.main_loop.call_soon_threadsafe( handler, key, state) @@ -719,7 +723,8 @@ class MIoTServiceEntity(Entity): _main_loop: asyncio.AbstractEventLoop _prop_value_map: dict[MIoTSpecProperty, Any] - _event_occurred_handler: Callable[[MIoTSpecEvent, dict], None] + _event_occurred_handler: Optional[ + Callable[[MIoTSpecEvent, dict], None]] _prop_changed_subs: dict[ MIoTSpecProperty, Callable[[MIoTSpecProperty, Any], None]] @@ -763,7 +768,9 @@ class MIoTServiceEntity(Entity): self.entity_id) @property - def event_occurred_handler(self) -> Callable[[MIoTSpecEvent, dict], None]: + def event_occurred_handler( + self + ) -> Optional[Callable[[MIoTSpecEvent, dict], None]]: return self._event_occurred_handler @event_occurred_handler.setter @@ -784,7 +791,7 @@ class MIoTServiceEntity(Entity): self._prop_changed_subs.pop(prop, None) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1002,7 +1009,7 @@ class MIoTPropertyEntity(Entity): # {'min':int, 'max':int, 'step': int} _value_range: dict[str, int] # {Any: Any} - _value_list: dict[Any, Any] + _value_list: Optional[dict[Any, Any]] _value: Any _pending_write_ha_state_timer: Optional[asyncio.TimerHandle] @@ -1042,7 +1049,7 @@ class MIoTPropertyEntity(Entity): self._value_list) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1067,7 +1074,7 @@ class MIoTPropertyEntity(Entity): self.miot_device.unsub_property( siid=self.service.iid, piid=self.spec.iid) - def get_vlist_description(self, value: Any) -> str: + def get_vlist_description(self, value: Any) -> Optional[str]: if not self._value_list: return None return self._value_list.get(value, None) @@ -1184,7 +1191,7 @@ class MIoTEventEntity(Entity): spec.device_class, self.entity_id) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1286,7 +1293,7 @@ class MIoTActionEntity(Entity): spec.device_class, self.entity_id) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1298,7 +1305,9 @@ class MIoTActionEntity(Entity): self.miot_device.unsub_device_state( key=f'{self.action_platform}.{ self.service.iid}.{self.spec.iid}') - async def action_async(self, in_list: list = None) -> Optional[list]: + async def action_async( + self, in_list: Optional[list] = None + ) -> Optional[list]: try: return await self.miot_device.miot_client.action_async( did=self.miot_device.did, From d9d843340597b84d4056482d05a6c695c79041e2 Mon Sep 17 00:00:00 2001 From: topsworld Date: Tue, 7 Jan 2025 20:24:06 +0800 Subject: [PATCH 2/2] fix: fix type error --- custom_components/xiaomi_home/miot/common.py | 70 ++ .../xiaomi_home/miot/miot_spec.py | 661 ++++++++++-------- 2 files changed, 420 insertions(+), 311 deletions(-) diff --git a/custom_components/xiaomi_home/miot/common.py b/custom_components/xiaomi_home/miot/common.py index 0ee4f1d..dec21c3 100644 --- a/custom_components/xiaomi_home/miot/common.py +++ b/custom_components/xiaomi_home/miot/common.py @@ -45,11 +45,14 @@ off Xiaomi or its affiliates' products. Common utilities. """ +import asyncio import json from os import path import random from typing import Any, Optional import hashlib +from urllib.parse import urlencode +from urllib.request import Request, urlopen from paho.mqtt.matcher import MQTTMatcher import yaml @@ -83,10 +86,12 @@ def randomize_int(value: int, ratio: float) -> int: """Randomize an integer value.""" return int(value * (1 - ratio + random.random()*2*ratio)) + def randomize_float(value: float, ratio: float) -> float: """Randomize a float value.""" return value * (1 - ratio + random.random()*2*ratio) + class MIoTMatcher(MQTTMatcher): """MIoT Pub/Sub topic matcher.""" @@ -105,3 +110,68 @@ class MIoTMatcher(MQTTMatcher): return self[topic] except KeyError: return None + + +class MIoTHttp: + """MIoT Common HTTP API.""" + @staticmethod + def get( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[str]: + full_url = url + if params: + encoded_params = urlencode(params) + full_url = f'{url}?{encoded_params}' + request = Request(full_url, method='GET', headers=headers or {}) + content: Optional[bytes] = None + with urlopen(request) as response: + content = response.read() + return str(content, 'utf-8') if content else None + + @staticmethod + def get_json( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[dict]: + response = MIoTHttp.get(url, params, headers) + return json.loads(response) if response else None + + @staticmethod + def post( + url: str, data: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[str]: + pass + + @staticmethod + def post_json( + url: str, data: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[dict]: + response = MIoTHttp.post(url, data, headers) + return json.loads(response) if response else None + + @staticmethod + async def get_async( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Optional[str]: + # TODO: Use aiohttp + ev_loop = loop or asyncio.get_running_loop() + return await ev_loop.run_in_executor( + None, MIoTHttp.get, url, params, headers) + + @staticmethod + async def get_json_async( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Optional[dict]: + ev_loop = loop or asyncio.get_running_loop() + return await ev_loop.run_in_executor( + None, MIoTHttp.get_json, url, params, headers) + + @ staticmethod + async def post_async( + url: str, data: Optional[dict] = None, headers: Optional[dict] = None, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Optional[str]: + ev_loop = loop or asyncio.get_running_loop() + return await ev_loop.run_in_executor( + None, MIoTHttp.post, url, data, headers) diff --git a/custom_components/xiaomi_home/miot/miot_spec.py b/custom_components/xiaomi_home/miot/miot_spec.py index 33022a1..1122551 100644 --- a/custom_components/xiaomi_home/miot/miot_spec.py +++ b/custom_components/xiaomi_home/miot/miot_spec.py @@ -54,8 +54,10 @@ from urllib.parse import urlencode from urllib.request import Request, urlopen import logging + # pylint: disable=relative-beyond-top-level from .const import DEFAULT_INTEGRATION_LANGUAGE, SPEC_STD_LIB_EFFECTIVE_TIME +from .common import MIoTHttp from .miot_error import MIoTSpecError from .miot_storage import ( MIoTStorage, @@ -66,6 +68,291 @@ from .miot_storage import ( _LOGGER = logging.getLogger(__name__) +class _MIoTSpecValueRange: + """MIoT SPEC value range class.""" + min_: int + max_: int + step: int + + def from_list(self, value_range: list) -> None: + self.min_ = value_range[0] + self.max_ = value_range[1] + self.step = value_range[2] + + def to_list(self) -> list: + return [self.min_, self.max_, self.step] + + +class _MIoTSpecValueListItem: + """MIoT SPEC value list item class.""" + # All lower-case SPEC description. + name: str + # Value + value: Any + # Descriptions after multilingual conversion. + description: str + + def to_dict(self) -> dict: + return { + 'name': self.name, + 'value': self.value, + 'description': self.description + } + + +class _MIoTSpecValueList: + """MIoT SPEC value list class.""" + items: list[_MIoTSpecValueListItem] + + def to_map(self) -> dict: + return {item.value: item.description for item in self.items} + + def to_list(self) -> list: + return [item.to_dict() for item in self.items] + + +class _SpecStdLib: + """MIoT-Spec-V2 standard library.""" + _lang: str + _devices: dict[str, dict[str, str]] + _services: dict[str, dict[str, str]] + _properties: dict[str, dict[str, str]] + _events: dict[str, dict[str, str]] + _actions: dict[str, dict[str, str]] + _values: dict[str, dict[str, str]] + + def __init__(self, lang: str) -> None: + self._lang = lang + self._devices = {} + self._services = {} + self._properties = {} + self._events = {} + self._actions = {} + self._values = {} + + self._spec_std_lib = None + + def from_dict(self, std_lib: dict[str, dict[str, dict[str, str]]]) -> None: + if ( + not isinstance(std_lib, dict) + or 'devices' not in std_lib + or 'services' not in std_lib + or 'properties' not in std_lib + or 'events' not in std_lib + or 'actions' not in std_lib + or 'values' not in std_lib + ): + return + self._devices = std_lib['devices'] + self._services = std_lib['services'] + self._properties = std_lib['properties'] + self._events = std_lib['events'] + self._actions = std_lib['actions'] + self._values = std_lib['values'] + + def device_translate(self, key: str) -> Optional[str]: + if not self._devices or key not in self._devices: + return None + if self._lang not in self._devices[key]: + return self._devices[key].get( + DEFAULT_INTEGRATION_LANGUAGE, None) + return self._devices[key][self._lang] + + def service_translate(self, key: str) -> Optional[str]: + if not self._services or key not in self._services: + return None + if self._lang not in self._services[key]: + return self._services[key].get( + DEFAULT_INTEGRATION_LANGUAGE, None) + return self._services[key][self._lang] + + def property_translate(self, key: str) -> Optional[str]: + if not self._properties or key not in self._properties: + return None + if self._lang not in self._properties[key]: + return self._properties[key].get( + DEFAULT_INTEGRATION_LANGUAGE, None) + return self._properties[key][self._lang] + + def event_translate(self, key: str) -> Optional[str]: + if not self._events or key not in self._events: + return None + if self._lang not in self._events[key]: + return self._events[key].get( + DEFAULT_INTEGRATION_LANGUAGE, None) + return self._events[key][self._lang] + + def action_translate(self, key: str) -> Optional[str]: + if not self._actions or key not in self._actions: + return None + if self._lang not in self._actions[key]: + return self._actions[key].get( + DEFAULT_INTEGRATION_LANGUAGE, None) + return self._actions[key][self._lang] + + def value_translate(self, key: str) -> Optional[str]: + if not self._values or key not in self._values: + return None + if self._lang not in self._values[key]: + return self._values[key].get( + DEFAULT_INTEGRATION_LANGUAGE, None) + return self._values[key][self._lang] + + def dump(self) -> dict[str, dict[str, dict[str, str]]]: + return { + 'devices': self._devices, + 'services': self._services, + 'properties': self._properties, + 'events': self._events, + 'actions': self._actions, + 'values': self._values + } + + async def refresh_async(self) -> bool: + std_lib_new = await self.__request_from_cloud_async() + if std_lib_new: + self.from_dict(std_lib_new) + return True + return False + + async def __request_from_cloud_async(self) -> Optional[dict]: + std_libs: Optional[dict] = None + for index in range(3): + try: + tasks: list = [] + # Get std lib + for name in [ + 'device', 'service', 'property', 'event', 'action']: + tasks.append(self.__get_template_list( + 'https://miot-spec.org/miot-spec-v2/template/list/' + + name)) + tasks.append(self.__get_property_value()) + # Async request + results = await asyncio.gather(*tasks) + if None in results: + raise MIoTSpecError('init failed, None in result') + std_libs = { + 'devices': results[0], + 'services': results[1], + 'properties': results[2], + 'events': results[3], + 'actions': results[4], + 'values': results[5], + } + # Get external std lib, Power by LM + tasks.clear() + for name in [ + 'device', 'service', 'property', 'event', 'action', + 'property_value']: + tasks.append(MIoTHttp.get_json_async( + 'https://cdn.cnbj1.fds.api.mi-img.com/res-conf/' + f'xiaomi-home/std_ex_{name}.json')) + results = await asyncio.gather(*tasks) + if results[0]: + for key, value in results[0].items(): + if key in std_libs['devices']: + std_libs['devices'][key].update(value) + else: + std_libs['devices'][key] = value + else: + _LOGGER.error('get external std lib failed, devices') + if results[1]: + for key, value in results[1].items(): + if key in std_libs['services']: + std_libs['services'][key].update(value) + else: + std_libs['services'][key] = value + else: + _LOGGER.error('get external std lib failed, services') + if results[2]: + for key, value in results[2].items(): + if key in std_libs['properties']: + std_libs['properties'][key].update(value) + else: + std_libs['properties'][key] = value + else: + _LOGGER.error('get external std lib failed, properties') + if results[3]: + for key, value in results[3].items(): + if key in std_libs['events']: + std_libs['events'][key].update(value) + else: + std_libs['events'][key] = value + else: + _LOGGER.error('get external std lib failed, events') + if results[4]: + for key, value in results[4].items(): + if key in std_libs['actions']: + std_libs['actions'][key].update(value) + else: + std_libs['actions'][key] = value + else: + _LOGGER.error('get external std lib failed, actions') + if results[5]: + for key, value in results[5].items(): + if key in std_libs['values']: + std_libs['values'][key].update(value) + else: + std_libs['values'][key] = value + else: + _LOGGER.error( + 'get external std lib failed, values') + return std_libs + except Exception as err: # pylint: disable=broad-exception-caught + _LOGGER.error( + 'update spec std lib error, retry, %d, %s', index, err) + return None + + async def __get_property_value(self) -> dict: + reply = await MIoTHttp.get_json_async( + url='https://miot-spec.org/miot-spec-v2' + '/normalization/list/property_value') + if reply is None or 'result' not in reply: + raise MIoTSpecError('get property value failed') + result = {} + for item in reply['result']: + if ( + not isinstance(item, dict) + or 'normalization' not in item + or 'description' not in item + or 'proName' not in item + or 'urn' not in item + ): + continue + result[ + f'{item["urn"]}|{item["proName"]}|{item["normalization"]}' + ] = { + 'zh-Hans': item['description'], + 'en': item['normalization'] + } + return result + + async def __get_template_list(self, url: str) -> dict: + reply = await MIoTHttp.get_json_async(url=url) + if reply is None or 'result' not in reply: + raise MIoTSpecError(f'get service failed, {url}') + result: dict = {} + for item in reply['result']: + if ( + not isinstance(item, dict) + or 'type' not in item + or 'description' not in item + ): + continue + if 'zh_cn' in item['description']: + item['description']['zh-Hans'] = item['description'].pop( + 'zh_cn') + if 'zh_hk' in item['description']: + item['description']['zh-Hant'] = item['description'].pop( + 'zh_hk') + item['description'].pop('zh_tw', None) + elif 'zh_tw' in item['description']: + item['description']['zh-Hant'] = item['description'].pop( + 'zh_tw') + result[item['type']] = item['description'] + return result + + class MIoTSpecBase: """MIoT SPEC base class.""" iid: int @@ -77,13 +364,13 @@ class MIoTSpecBase: name: Optional[str] # External params - platform: str + platform: Optional[str] device_class: Any state_class: Any - icon: str + icon: Optional[str] external_unit: Any - spec_id: str + spec_id: int def __init__(self, spec: dict) -> None: self.iid = spec['iid'] @@ -106,7 +393,7 @@ class MIoTSpecBase: def __hash__(self) -> int: return self.spec_id - def __eq__(self, value: object) -> bool: + def __eq__(self, value) -> bool: return self.spec_id == value.spec_id @@ -114,10 +401,10 @@ class MIoTSpecProperty(MIoTSpecBase): """MIoT SPEC property class.""" format_: str precision: int - unit: str + unit: Optional[str] - value_range: list - value_list: list[dict] + value_range: Optional[list] + value_list: Optional[list[dict]] _access: list _writable: bool @@ -127,10 +414,9 @@ class MIoTSpecProperty(MIoTSpecBase): service: MIoTSpecBase def __init__( - self, spec: dict, service: MIoTSpecBase = None, - format_: str = None, access: list = None, - unit: str = None, value_range: list = None, - value_list: list[dict] = None, precision: int = 0 + self, spec: dict, service: MIoTSpecBase, format_: str, access: list, + unit: Optional[str] = None, value_range: Optional[list] = None, + value_list: Optional[list[dict]] = None, precision: int = 0 ) -> None: super().__init__(spec=spec) self.service = service @@ -203,7 +489,7 @@ class MIoTSpecEvent(MIoTSpecBase): service: MIoTSpecBase def __init__( - self, spec: dict, service: MIoTSpecBase = None, + self, spec: dict, service: MIoTSpecBase, argument: list[MIoTSpecProperty] = None ) -> None: super().__init__(spec=spec) @@ -372,86 +658,6 @@ class MIoTSpecInstance: } -class SpecStdLib: - """MIoT-Spec-V2 standard library.""" - _lang: str - _spec_std_lib: Optional[dict[str, dict[str, dict[str, str]]]] - - def __init__(self, lang: str) -> None: - self._lang = lang - self._spec_std_lib = None - - def init(self, std_lib: dict[str, dict[str, str]]) -> None: - if ( - not isinstance(std_lib, dict) - or 'devices' not in std_lib - or 'services' not in std_lib - or 'properties' not in std_lib - or 'events' not in std_lib - or 'actions' not in std_lib - or 'values' not in std_lib - ): - return - self._spec_std_lib = std_lib - - def deinit(self) -> None: - self._spec_std_lib = None - - def device_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['devices']: - return None - if self._lang not in self._spec_std_lib['devices'][key]: - return self._spec_std_lib['devices'][key].get( - DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['devices'][key][self._lang] - - def service_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['services']: - return None - if self._lang not in self._spec_std_lib['services'][key]: - return self._spec_std_lib['services'][key].get( - DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['services'][key][self._lang] - - def property_translate(self, key: str) -> Optional[str]: - if ( - not self._spec_std_lib - or key not in self._spec_std_lib['properties'] - ): - return None - if self._lang not in self._spec_std_lib['properties'][key]: - return self._spec_std_lib['properties'][key].get( - DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['properties'][key][self._lang] - - def event_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['events']: - return None - if self._lang not in self._spec_std_lib['events'][key]: - return self._spec_std_lib['events'][key].get( - DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['events'][key][self._lang] - - def action_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['actions']: - return None - if self._lang not in self._spec_std_lib['actions'][key]: - return self._spec_std_lib['actions'][key].get( - DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['actions'][key][self._lang] - - def value_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['values']: - return None - if self._lang not in self._spec_std_lib['values'][key]: - return self._spec_std_lib['values'][key].get( - DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['values'][key][self._lang] - - def dump(self) -> dict[str, dict[str, str]]: - return self._spec_std_lib - - class MIoTSpecParser: """MIoT SPEC parser.""" # pylint: disable=inconsistent-quotes @@ -464,24 +670,24 @@ class MIoTSpecParser: _init_done: bool _ram_cache: dict - _std_lib: SpecStdLib + _std_lib: _SpecStdLib _bool_trans: SpecBoolTranslation _multi_lang: SpecMultiLang _spec_filter: SpecFilter def __init__( - self, lang: str = DEFAULT_INTEGRATION_LANGUAGE, - storage: MIoTStorage = None, + self, lang: Optional[str], + storage: MIoTStorage, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: - self._lang = lang + self._lang = lang or DEFAULT_INTEGRATION_LANGUAGE self._storage = storage self._main_loop = loop or asyncio.get_running_loop() self._init_done = False self._ram_cache = {} - self._std_lib = SpecStdLib(lang=self._lang) + self._std_lib = _SpecStdLib(lang=self._lang) self._bool_trans = SpecBoolTranslation( lang=self._lang, loop=self._main_loop) self._multi_lang = SpecMultiLang(lang=self._lang, loop=self._main_loop) @@ -493,48 +699,43 @@ class MIoTSpecParser: await self._bool_trans.init_async() await self._multi_lang.init_async() await self._spec_filter.init_async() - std_lib_cache: dict = None - if self._storage: - std_lib_cache: dict = await self._storage.load_async( - domain=self.DOMAIN, name='spec_std_lib', type_=dict) - if ( - isinstance(std_lib_cache, dict) - and 'data' in std_lib_cache - and 'ts' in std_lib_cache - and isinstance(std_lib_cache['ts'], int) - and int(time.time()) - std_lib_cache['ts'] < - SPEC_STD_LIB_EFFECTIVE_TIME - ): - # Use the cache if the update time is less than 14 day - _LOGGER.debug( - 'use local spec std cache, ts->%s', std_lib_cache['ts']) - self._std_lib.init(std_lib_cache['data']) - self._init_done = True - return + std_lib_cache = await self._storage.load_async( + domain=self.DOMAIN, name='spec_std_lib', type_=dict) + if ( + isinstance(std_lib_cache, dict) + and 'data' in std_lib_cache + and 'ts' in std_lib_cache + and isinstance(std_lib_cache['ts'], int) + and int(time.time()) - std_lib_cache['ts'] < + SPEC_STD_LIB_EFFECTIVE_TIME + ): + # Use the cache if the update time is less than 14 day + _LOGGER.debug( + 'use local spec std cache, ts->%s', std_lib_cache['ts']) + self._std_lib.from_dict(std_lib_cache['data']) + self._init_done = True + return # Update spec std lib - spec_lib_new = await self.__request_spec_std_lib_async() - if spec_lib_new: - self._std_lib.init(spec_lib_new) - if self._storage: - if not await self._storage.save_async( - domain=self.DOMAIN, name='spec_std_lib', - data={ - 'data': self._std_lib.dump(), - 'ts': int(time.time()) - } - ): - _LOGGER.error('save spec std lib failed') + if await self._std_lib.refresh_async(): + if not await self._storage.save_async( + domain=self.DOMAIN, name='spec_std_lib', + data={ + 'data': self._std_lib.dump(), + 'ts': int(time.time()) + } + ): + _LOGGER.error('save spec std lib failed') else: - if std_lib_cache: - self._std_lib.init(std_lib_cache['data']) - _LOGGER.error('get spec std lib failed, use local cache') + if isinstance(std_lib_cache, dict) and 'data' in std_lib_cache: + self._std_lib.from_dict(std_lib_cache['data']) + _LOGGER.info('get spec std lib failed, use local cache') else: - _LOGGER.error('get spec std lib failed') + _LOGGER.error('load spec std lib failed') self._init_done = True async def deinit_async(self) -> None: self._init_done = False - self._std_lib.deinit() + # self._std_lib.deinit() await self._bool_trans.deinit_async() await self._multi_lang.deinit_async() await self._spec_filter.deinit_async() @@ -562,18 +763,15 @@ class MIoTSpecParser: """MUST await init first !!!""" if not urn_list: return False - spec_std_new: dict = await self.__request_spec_std_lib_async() - if spec_std_new: - self._std_lib.init(spec_std_new) - if self._storage: - if not await self._storage.save_async( - domain=self.DOMAIN, name='spec_std_lib', - data={ - 'data': self._std_lib.dump(), - 'ts': int(time.time()) - } - ): - _LOGGER.error('save spec std lib failed') + if await self._std_lib.refresh_async(): + if not await self._storage.save_async( + domain=self.DOMAIN, name='spec_std_lib', + data={ + 'data': self._std_lib.dump(), + 'ts': int(time.time()) + } + ): + _LOGGER.error('save spec std lib failed') else: raise MIoTSpecError('get spec std lib failed') success_count = 0 @@ -585,28 +783,6 @@ class MIoTSpecParser: success_count += sum(1 for result in results if result is not None) return success_count - def __http_get( - self, url: str, params: dict = None, headers: dict = None - ) -> dict: - if params: - encoded_params = urlencode(params) - full_url = f'{url}?{encoded_params}' - else: - full_url = url - request = Request(full_url, method='GET', headers=headers or {}) - content: bytes = None - with urlopen(request) as response: - content = response.read() - return ( - json.loads(str(content, 'utf-8')) - if content is not None else None) - - async def __http_get_async( - self, url: str, params: dict = None, headers: dict = None - ) -> dict: - return await self._main_loop.run_in_executor( - None, self.__http_get, url, params, headers) - async def __cache_get(self, urn: str) -> Optional[dict]: if self._storage is not None: if platform.system() == 'Windows': @@ -630,157 +806,20 @@ class MIoTSpecParser: return {'string': 'str', 'bool': 'bool', 'float': 'float'}.get( format_, 'int') - async def __request_spec_std_lib_async(self) -> Optional[SpecStdLib]: - std_libs: dict = None - for index in range(3): - try: - tasks: list = [] - # Get std lib - for name in [ - 'device', 'service', 'property', 'event', 'action']: - tasks.append(self.__get_template_list( - 'https://miot-spec.org/miot-spec-v2/template/list/' - + name)) - tasks.append(self.__get_property_value()) - # Async request - results = await asyncio.gather(*tasks) - if None in results: - raise MIoTSpecError('init failed, None in result') - std_libs = { - 'devices': results[0], - 'services': results[1], - 'properties': results[2], - 'events': results[3], - 'actions': results[4], - 'values': results[5], - } - # Get external std lib, Power by LM - tasks.clear() - for name in [ - 'device', 'service', 'property', 'event', 'action', - 'property_value']: - tasks.append(self.__http_get_async( - 'https://cdn.cnbj1.fds.api.mi-img.com/res-conf/' - f'xiaomi-home/std_ex_{name}.json')) - results = await asyncio.gather(*tasks) - if results[0]: - for key, value in results[0].items(): - if key in std_libs['devices']: - std_libs['devices'][key].update(value) - else: - std_libs['devices'][key] = value - else: - _LOGGER.error('get external std lib failed, devices') - if results[1]: - for key, value in results[1].items(): - if key in std_libs['services']: - std_libs['services'][key].update(value) - else: - std_libs['services'][key] = value - else: - _LOGGER.error('get external std lib failed, services') - if results[2]: - for key, value in results[2].items(): - if key in std_libs['properties']: - std_libs['properties'][key].update(value) - else: - std_libs['properties'][key] = value - else: - _LOGGER.error('get external std lib failed, properties') - if results[3]: - for key, value in results[3].items(): - if key in std_libs['events']: - std_libs['events'][key].update(value) - else: - std_libs['events'][key] = value - else: - _LOGGER.error('get external std lib failed, events') - if results[4]: - for key, value in results[4].items(): - if key in std_libs['actions']: - std_libs['actions'][key].update(value) - else: - std_libs['actions'][key] = value - else: - _LOGGER.error('get external std lib failed, actions') - if results[5]: - for key, value in results[5].items(): - if key in std_libs['values']: - std_libs['values'][key].update(value) - else: - std_libs['values'][key] = value - else: - _LOGGER.error( - 'get external std lib failed, values') - return std_libs - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error( - 'update spec std lib error, retry, %d, %s', index, err) - return None - - async def __get_property_value(self) -> dict: - reply = await self.__http_get_async( - url='https://miot-spec.org/miot-spec-v2' - '/normalization/list/property_value') - if reply is None or 'result' not in reply: - raise MIoTSpecError('get property value failed') - result = {} - for item in reply['result']: - if ( - not isinstance(item, dict) - or 'normalization' not in item - or 'description' not in item - or 'proName' not in item - or 'urn' not in item - ): - continue - result[ - f'{item["urn"]}|{item["proName"]}|{item["normalization"]}' - ] = { - 'zh-Hans': item['description'], - 'en': item['normalization'] - } - return result - - async def __get_template_list(self, url: str) -> dict: - reply = await self.__http_get_async(url=url) - if reply is None or 'result' not in reply: - raise MIoTSpecError(f'get service failed, {url}') - result: dict = {} - for item in reply['result']: - if ( - not isinstance(item, dict) - or 'type' not in item - or 'description' not in item - ): - continue - if 'zh_cn' in item['description']: - item['description']['zh-Hans'] = item['description'].pop( - 'zh_cn') - if 'zh_hk' in item['description']: - item['description']['zh-Hant'] = item['description'].pop( - 'zh_hk') - item['description'].pop('zh_tw', None) - elif 'zh_tw' in item['description']: - item['description']['zh-Hant'] = item['description'].pop( - 'zh_tw') - result[item['type']] = item['description'] - return result - - async def __get_instance(self, urn: str) -> dict: - return await self.__http_get_async( + async def __get_instance(self, urn: str) -> Optional[dict]: + return await MIoTHttp.get_json_async( url='https://miot-spec.org/miot-spec-v2/instance', params={'type': urn}) - async def __get_translation(self, urn: str) -> dict: - return await self.__http_get_async( + async def __get_translation(self, urn: str) -> Optional[dict]: + return await MIoTHttp.get_json_async( url='https://miot-spec.org/instance/v2/multiLanguage', params={'urn': urn}) async def __parse(self, urn: str) -> MIoTSpecInstance: _LOGGER.debug('parse urn, %s', urn) # Load spec instance - instance: dict = await self.__get_instance(urn=urn) + instance = await self.__get_instance(urn=urn) if ( not isinstance(instance, dict) or 'type' not in instance @@ -789,6 +828,8 @@ class MIoTSpecParser: ): raise MIoTSpecError(f'invalid urn instance, {urn}') translation: dict = {} + urn_strs: list[str] = urn.split(':') + urn_key: str = ':'.join(urn_strs[:6]) try: # Load multiple language configuration. res_trans = await self.__get_translation(urn=urn) @@ -798,9 +839,7 @@ class MIoTSpecParser: or not isinstance(res_trans['data'], dict) ): raise MIoTSpecError('invalid translation data') - urn_strs: list[str] = urn.split(':') - urn_key: str = ':'.join(urn_strs[:6]) - trans_data: dict[str, str] = None + trans_data: dict[str, str] = {} if self._lang == 'zh-Hans': # Simplified Chinese trans_data = res_trans['data'].get('zh_cn', {})