diff --git a/custom_components/xiaomi_home/climate.py b/custom_components/xiaomi_home/climate.py index bd4cfe3..fb3dc45 100644 --- a/custom_components/xiaomi_home/climate.py +++ b/custom_components/xiaomi_home/climate.py @@ -156,64 +156,56 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity): _LOGGER.error( 'unknown on property, %s', self.entity_id) elif prop.name == 'mode': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'invalid mode value_list, %s', self.entity_id) continue self._hvac_mode_map = {} - for item in prop.value_list: - if item['name'].lower() in {'off', 'idle'}: - self._hvac_mode_map[item['value']] = HVACMode.OFF - elif item['name'].lower() in {'auto'}: - self._hvac_mode_map[item['value']] = HVACMode.AUTO - elif item['name'].lower() in {'cool'}: - self._hvac_mode_map[item['value']] = HVACMode.COOL - elif item['name'].lower() in {'heat'}: - self._hvac_mode_map[item['value']] = HVACMode.HEAT - elif item['name'].lower() in {'dry'}: - self._hvac_mode_map[item['value']] = HVACMode.DRY - elif item['name'].lower() in {'fan'}: - self._hvac_mode_map[item['value']] = HVACMode.FAN_ONLY + for item in prop.value_list.items: + if item.name in {'off', 'idle'}: + self._hvac_mode_map[item.value] = HVACMode.OFF + elif item.name in {'auto'}: + self._hvac_mode_map[item.value] = HVACMode.AUTO + elif item.name in {'cool'}: + self._hvac_mode_map[item.value] = HVACMode.COOL + elif item.name in {'heat'}: + self._hvac_mode_map[item.value] = HVACMode.HEAT + elif item.name in {'dry'}: + self._hvac_mode_map[item.value] = HVACMode.DRY + elif item.name in {'fan'}: + self._hvac_mode_map[item.value] = HVACMode.FAN_ONLY self._attr_hvac_modes = list(self._hvac_mode_map.values()) self._prop_mode = prop elif prop.name == 'target-temperature': - if not isinstance(prop.value_range, dict): + if not prop.value_range: _LOGGER.error( 'invalid target-temperature value_range format, %s', self.entity_id) continue - self._attr_min_temp = prop.value_range['min'] - self._attr_max_temp = prop.value_range['max'] - self._attr_target_temperature_step = prop.value_range['step'] + self._attr_min_temp = prop.value_range.min_ + self._attr_max_temp = prop.value_range.max_ + self._attr_target_temperature_step = prop.value_range.step self._attr_temperature_unit = prop.external_unit self._attr_supported_features |= ( ClimateEntityFeature.TARGET_TEMPERATURE) self._prop_target_temp = prop elif prop.name == 'target-humidity': - if not isinstance(prop.value_range, dict): + if not prop.value_range: _LOGGER.error( 'invalid target-humidity value_range format, %s', self.entity_id) continue - self._attr_min_humidity = prop.value_range['min'] - self._attr_max_humidity = prop.value_range['max'] + self._attr_min_humidity = prop.value_range.min_ + self._attr_max_humidity = prop.value_range.max_ self._attr_supported_features |= ( ClimateEntityFeature.TARGET_HUMIDITY) self._prop_target_humi = prop elif prop.name == 'fan-level': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'invalid fan-level value_list, %s', self.entity_id) continue - self._fan_mode_map = { - item['value']: item['description'] - for item in prop.value_list} + self._fan_mode_map = prop.value_list.to_map() self._attr_fan_modes = list(self._fan_mode_map.values()) self._attr_supported_features |= ClimateEntityFeature.FAN_MODE self._prop_fan_level = prop @@ -269,8 +261,8 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity): elif self.get_prop_value(prop=self._prop_on) is False: await self.set_property_async(prop=self._prop_on, value=True) # set mode - mode_value = self.get_map_value( - map_=self._hvac_mode_map, description=hvac_mode) + mode_value = self.get_map_key( + map_=self._hvac_mode_map, value=hvac_mode) if ( mode_value is None or not await self.set_property_async( @@ -339,8 +331,8 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity): async def async_set_fan_mode(self, fan_mode): """Set new target fan mode.""" - mode_value = self.get_map_value( - map_=self._fan_mode_map, description=fan_mode) + mode_value = self.get_map_key( + map_=self._fan_mode_map, value=fan_mode) if mode_value is None or not await self.set_property_async( prop=self._prop_fan_level, value=mode_value): raise RuntimeError( @@ -376,9 +368,9 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity): """Return the hvac mode. e.g., heat, cool mode.""" if self.get_prop_value(prop=self._prop_on) is False: return HVACMode.OFF - return self.get_map_description( + return self.get_map_key( map_=self._hvac_mode_map, - key=self.get_prop_value(prop=self._prop_mode)) + value=self.get_prop_value(prop=self._prop_mode)) @property def fan_mode(self) -> Optional[str]: @@ -386,7 +378,7 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity): Requires ClimateEntityFeature.FAN_MODE. """ - return self.get_map_description( + return self.get_map_value( map_=self._fan_mode_map, key=self.get_prop_value(prop=self._prop_fan_level)) @@ -446,8 +438,8 @@ class AirConditioner(MIoTServiceEntity, ClimateEntity): }.get(v_ac_state['M'], None) if mode: self.set_prop_value( - prop=self._prop_mode, value=self.get_map_value( - map_=self._hvac_mode_map, description=mode)) + prop=self._prop_mode, value=self.get_map_key( + map_=self._hvac_mode_map, value=mode)) # T: target temperature if 'T' in v_ac_state and self._prop_target_temp: self.set_prop_value(prop=self._prop_target_temp, @@ -517,29 +509,24 @@ class Heater(MIoTServiceEntity, ClimateEntity): ClimateEntityFeature.TURN_OFF) self._prop_on = prop elif prop.name == 'target-temperature': - if not isinstance(prop.value_range, dict): + if not prop.value_range: _LOGGER.error( 'invalid target-temperature value_range format, %s', self.entity_id) continue - self._attr_min_temp = prop.value_range['min'] - self._attr_max_temp = prop.value_range['max'] - self._attr_target_temperature_step = prop.value_range['step'] + self._attr_min_temp = prop.value_range.min_ + self._attr_max_temp = prop.value_range.max_ + self._attr_target_temperature_step = prop.value_range.step self._attr_temperature_unit = prop.external_unit self._attr_supported_features |= ( ClimateEntityFeature.TARGET_TEMPERATURE) self._prop_target_temp = prop elif prop.name == 'heat-level': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'invalid heat-level value_list, %s', self.entity_id) continue - self._heat_level_map = { - item['value']: item['description'] - for item in prop.value_list} + self._heat_level_map = prop.value_list.to_map() self._attr_preset_modes = list(self._heat_level_map.values()) self._attr_supported_features |= ( ClimateEntityFeature.PRESET_MODE) @@ -582,8 +569,8 @@ class Heater(MIoTServiceEntity, ClimateEntity): """Set the preset mode.""" await self.set_property_async( self._prop_heat_level, - value=self.get_map_value( - map_=self._heat_level_map, description=preset_mode)) + value=self.get_map_key( + map_=self._heat_level_map, value=preset_mode)) @property def target_temperature(self) -> Optional[float]: @@ -613,7 +600,7 @@ class Heater(MIoTServiceEntity, ClimateEntity): @property def preset_mode(self) -> Optional[str]: return ( - self.get_map_description( + self.get_map_value( map_=self._heat_level_map, key=self.get_prop_value(prop=self._prop_heat_level)) if self._prop_heat_level else None) diff --git a/custom_components/xiaomi_home/cover.py b/custom_components/xiaomi_home/cover.py index d8236c7..78a6a02 100644 --- a/custom_components/xiaomi_home/cover.py +++ b/custom_components/xiaomi_home/cover.py @@ -132,53 +132,47 @@ class Cover(MIoTServiceEntity, CoverEntity): # properties for prop in entity_data.props: if prop.name == 'motor-control': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'motor-control value_list is None, %s', self.entity_id) continue - for item in prop.value_list: - if item['name'].lower() in ['open']: + for item in prop.value_list.items: + if item.name in {'open'}: self._attr_supported_features |= ( CoverEntityFeature.OPEN) - self._prop_motor_value_open = item['value'] - elif item['name'].lower() in ['close']: + self._prop_motor_value_open = item.value + elif item.name in {'close'}: self._attr_supported_features |= ( CoverEntityFeature.CLOSE) - self._prop_motor_value_close = item['value'] - elif item['name'].lower() in ['pause']: + self._prop_motor_value_close = item.value + elif item.name in {'pause'}: self._attr_supported_features |= ( CoverEntityFeature.STOP) - self._prop_motor_value_pause = item['value'] + self._prop_motor_value_pause = item.value self._prop_motor_control = prop elif prop.name == 'status': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'status value_list is None, %s', self.entity_id) continue - for item in prop.value_list: - if item['name'].lower() in ['opening', 'open']: - self._prop_status_opening = item['value'] - elif item['name'].lower() in ['closing', 'close']: - self._prop_status_closing = item['value'] - elif item['name'].lower() in ['stop', 'pause']: - self._prop_status_stop = item['value'] + for item in prop.value_list.items: + if item.name in {'opening', 'open'}: + self._prop_status_opening = item.value + elif item.name in {'closing', 'close'}: + self._prop_status_closing = item.value + elif item.name in {'stop', 'pause'}: + self._prop_status_stop = item.value self._prop_status = prop elif prop.name == 'current-position': self._prop_current_position = prop elif prop.name == 'target-position': - if not isinstance(prop.value_range, dict): + if not prop.value_range: _LOGGER.error( 'invalid target-position value_range format, %s', self.entity_id) continue - self._prop_position_value_min = prop.value_range['min'] - self._prop_position_value_max = prop.value_range['max'] + self._prop_position_value_min = prop.value_range.min_ + self._prop_position_value_max = prop.value_range.max_ self._prop_position_value_range = ( self._prop_position_value_max - self._prop_position_value_min) diff --git a/custom_components/xiaomi_home/fan.py b/custom_components/xiaomi_home/fan.py index 90220db..a28b989 100644 --- a/custom_components/xiaomi_home/fan.py +++ b/custom_components/xiaomi_home/fan.py @@ -87,7 +87,7 @@ async def async_setup_entry( class Fan(MIoTServiceEntity, FanEntity): """Fan entities for Xiaomi Home.""" # pylint: disable=unused-argument - _prop_on: Optional[MIoTSpecProperty] + _prop_on: MIoTSpecProperty _prop_fan_level: Optional[MIoTSpecProperty] _prop_mode: Optional[MIoTSpecProperty] _prop_horizontal_swing: Optional[MIoTSpecProperty] @@ -100,7 +100,7 @@ class Fan(MIoTServiceEntity, FanEntity): _speed_step: int _speed_names: Optional[list] _speed_name_map: Optional[dict[int, str]] - _mode_list: Optional[dict[Any, Any]] + _mode_map: Optional[dict[Any, Any]] def __init__( self, miot_device: MIoTDevice, entity_data: MIoTEntityData @@ -111,7 +111,7 @@ class Fan(MIoTServiceEntity, FanEntity): self._attr_current_direction = None self._attr_supported_features = FanEntityFeature(0) - self._prop_on = None + # _prop_on is required self._prop_fan_level = None self._prop_mode = None self._prop_horizontal_swing = None @@ -124,7 +124,7 @@ class Fan(MIoTServiceEntity, FanEntity): self._speed_names = [] self._speed_name_map = {} - self._mode_list = None + self._mode_map = None # properties for prop in entity_data.props: @@ -133,42 +133,34 @@ class Fan(MIoTServiceEntity, FanEntity): self._attr_supported_features |= FanEntityFeature.TURN_OFF self._prop_on = prop elif prop.name == 'fan-level': - if isinstance(prop.value_range, dict): + if prop.value_range: # Fan level with value-range - self._speed_min = prop.value_range['min'] - self._speed_max = prop.value_range['max'] - self._speed_step = prop.value_range['step'] + self._speed_min = prop.value_range.min_ + self._speed_max = prop.value_range.max_ + self._speed_step = prop.value_range.step self._attr_speed_count = int(( self._speed_max - self._speed_min)/self._speed_step)+1 self._attr_supported_features |= FanEntityFeature.SET_SPEED self._prop_fan_level = prop elif ( self._prop_fan_level is None - and isinstance(prop.value_list, list) and prop.value_list ): # Fan level with value-list # Fan level with value-range is prior to fan level with # value-list when a fan has both fan level properties. - self._speed_name_map = { - item['value']: item['description'] - for item in prop.value_list} + self._speed_name_map = prop.value_list.to_map() self._speed_names = list(self._speed_name_map.values()) - self._attr_speed_count = len(prop.value_list) + self._attr_speed_count = len(self._speed_names) self._attr_supported_features |= FanEntityFeature.SET_SPEED self._prop_fan_level = prop elif prop.name == 'mode': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'mode value_list is None, %s', self.entity_id) continue - self._mode_list = { - item['value']: item['description'] - for item in prop.value_list} - self._attr_preset_modes = list(self._mode_list.values()) + self._mode_map = prop.value_list.to_map() + self._attr_preset_modes = list(self._mode_map.values()) self._attr_supported_features |= FanEntityFeature.PRESET_MODE self._prop_mode = prop elif prop.name == 'horizontal-swing': @@ -178,16 +170,11 @@ class Fan(MIoTServiceEntity, FanEntity): if prop.format_ == 'bool': self._prop_wind_reverse_forward = False self._prop_wind_reverse_reverse = True - elif ( - isinstance(prop.value_list, list) - and prop.value_list - ): - for item in prop.value_list: - if item['name'].lower() in {'foreward'}: - self._prop_wind_reverse_forward = item['value'] - elif item['name'].lower() in { - 'reversal', 'reverse'}: - self._prop_wind_reverse_reverse = item['value'] + elif prop.value_list: + for item in prop.value_list.items: + if item.name in {'foreward'}: + self._prop_wind_reverse_forward = item.value + self._prop_wind_reverse_reverse = item.value if ( self._prop_wind_reverse_forward is None or self._prop_wind_reverse_reverse is None @@ -199,21 +186,9 @@ class Fan(MIoTServiceEntity, FanEntity): self._attr_supported_features |= FanEntityFeature.DIRECTION self._prop_wind_reverse = prop - def __get_mode_description(self, key: int) -> Optional[str]: - if self._mode_list is None: - return None - return self._mode_list.get(key, None) - - def __get_mode_value(self, description: str) -> Optional[int]: - if self._mode_list is None: - return None - for key, value in self._mode_list.items(): - if value == description: - return key - return None - async def async_turn_on( - self, percentage: int = None, preset_mode: str = None, **kwargs: Any + self, percentage: Optional[int] = None, + preset_mode: Optional[str] = None, **kwargs: Any ) -> None: """Turn the fan on. @@ -225,12 +200,12 @@ class Fan(MIoTServiceEntity, FanEntity): # percentage if percentage: if self._speed_names: - speed = percentage_to_ordered_list_item( - self._speed_names, percentage) - speed_value = self.get_map_value( - map_=self._speed_name_map, description=speed) await self.set_property_async( - prop=self._prop_fan_level, value=speed_value) + prop=self._prop_fan_level, + value=self.get_map_value( + map_=self._speed_name_map, + key=percentage_to_ordered_list_item( + self._speed_names, percentage))) else: await self.set_property_async( prop=self._prop_fan_level, @@ -241,7 +216,8 @@ class Fan(MIoTServiceEntity, FanEntity): if preset_mode: await self.set_property_async( self._prop_mode, - value=self.__get_mode_value(description=preset_mode)) + value=self.get_map_key( + map_=self._mode_map, value=preset_mode)) async def async_turn_off(self, **kwargs: Any) -> None: """Turn the fan off.""" @@ -255,12 +231,12 @@ class Fan(MIoTServiceEntity, FanEntity): """Set the percentage of the fan speed.""" if percentage > 0: if self._speed_names: - speed = percentage_to_ordered_list_item( - self._speed_names, percentage) - speed_value = self.get_map_value( - map_=self._speed_name_map, description=speed) await self.set_property_async( - prop=self._prop_fan_level, value=speed_value) + prop=self._prop_fan_level, + value=self.get_map_value( + map_=self._speed_name_map, + key=percentage_to_ordered_list_item( + self._speed_names, percentage))) else: await self.set_property_async( prop=self._prop_fan_level, @@ -277,7 +253,8 @@ class Fan(MIoTServiceEntity, FanEntity): """Set the preset mode.""" await self.set_property_async( self._prop_mode, - value=self.__get_mode_value(description=preset_mode)) + value=self.get_map_key( + map_=self._mode_map, value=preset_mode)) async def async_set_direction(self, direction: str) -> None: """Set the direction of the fan.""" @@ -306,7 +283,8 @@ class Fan(MIoTServiceEntity, FanEntity): """Return the current preset mode, e.g., auto, smart, eco, favorite.""" return ( - self.__get_mode_description( + self.get_map_value( + map_=self._mode_map, key=self.get_prop_value(prop=self._prop_mode)) if self._prop_mode else None) diff --git a/custom_components/xiaomi_home/humidifier.py b/custom_components/xiaomi_home/humidifier.py index 9739da4..1bcd5c8 100644 --- a/custom_components/xiaomi_home/humidifier.py +++ b/custom_components/xiaomi_home/humidifier.py @@ -97,7 +97,7 @@ class Humidifier(MIoTServiceEntity, HumidifierEntity): _prop_target_humidity: Optional[MIoTSpecProperty] _prop_humidity: Optional[MIoTSpecProperty] - _mode_list: dict[Any, Any] + _mode_map: dict[Any, Any] def __init__( self, miot_device: MIoTDevice, entity_data: MIoTEntityData @@ -110,7 +110,7 @@ class Humidifier(MIoTServiceEntity, HumidifierEntity): self._prop_mode = None self._prop_target_humidity = None self._prop_humidity = None - self._mode_list = None + self._mode_map = None # properties for prop in entity_data.props: @@ -119,28 +119,23 @@ class Humidifier(MIoTServiceEntity, HumidifierEntity): self._prop_on = prop # target-humidity elif prop.name == 'target-humidity': - if not isinstance(prop.value_range, dict): + if not prop.value_range: _LOGGER.error( 'invalid target-humidity value_range format, %s', self.entity_id) continue - self._attr_min_humidity = prop.value_range['min'] - self._attr_max_humidity = prop.value_range['max'] + self._attr_min_humidity = prop.value_range.min_ + self._attr_max_humidity = prop.value_range.max_ self._prop_target_humidity = prop # mode elif prop.name == 'mode': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'mode value_list is None, %s', self.entity_id) continue - self._mode_list = { - item['value']: item['description'] - for item in prop.value_list} + self._mode_map = prop.value_list.to_map() self._attr_available_modes = list( - self._mode_list.values()) + self._mode_map.values()) self._attr_supported_features |= HumidifierEntityFeature.MODES self._prop_mode = prop # relative-humidity @@ -163,7 +158,8 @@ class Humidifier(MIoTServiceEntity, HumidifierEntity): async def async_set_mode(self, mode: str) -> None: """Set new target preset mode.""" await self.set_property_async( - prop=self._prop_mode, value=self.__get_mode_value(description=mode)) + prop=self._prop_mode, + value=self.get_map_key(map_=self._mode_map, value=mode)) @property def is_on(self) -> Optional[bool]: @@ -183,20 +179,6 @@ class Humidifier(MIoTServiceEntity, HumidifierEntity): @property def mode(self) -> Optional[str]: """Return the current preset mode.""" - return self.__get_mode_description( + return self.get_map_value( + map_=self._mode_map, key=self.get_prop_value(prop=self._prop_mode)) - - def __get_mode_description(self, key: int) -> Optional[str]: - """Convert mode value to description.""" - if self._mode_list is None: - return None - return self._mode_list.get(key, None) - - def __get_mode_value(self, description: str) -> Optional[int]: - """Convert mode description to value.""" - if self._mode_list is None: - return None - for key, value in self._mode_list.items(): - if value == description: - return key - return None diff --git a/custom_components/xiaomi_home/light.py b/custom_components/xiaomi_home/light.py index 666464e..1667662 100644 --- a/custom_components/xiaomi_home/light.py +++ b/custom_components/xiaomi_home/light.py @@ -96,14 +96,14 @@ class Light(MIoTServiceEntity, LightEntity): """Light entities for Xiaomi Home.""" # pylint: disable=unused-argument _VALUE_RANGE_MODE_COUNT_MAX = 30 - _prop_on: Optional[MIoTSpecProperty] + _prop_on: MIoTSpecProperty _prop_brightness: Optional[MIoTSpecProperty] _prop_color_temp: Optional[MIoTSpecProperty] _prop_color: Optional[MIoTSpecProperty] _prop_mode: Optional[MIoTSpecProperty] _brightness_scale: Optional[tuple[int, int]] - _mode_list: Optional[dict[Any, Any]] + _mode_map: Optional[dict[Any, Any]] def __init__( self, miot_device: MIoTDevice, entity_data: MIoTEntityData @@ -122,7 +122,7 @@ class Light(MIoTServiceEntity, LightEntity): self._prop_color = None self._prop_mode = None self._brightness_scale = None - self._mode_list = None + self._mode_map = None # properties for prop in entity_data.props: @@ -131,20 +131,17 @@ class Light(MIoTServiceEntity, LightEntity): self._prop_on = prop # brightness if prop.name == 'brightness': - if isinstance(prop.value_range, dict): + if prop.value_range: self._brightness_scale = ( - prop.value_range['min'], prop.value_range['max']) + prop.value_range.min_, prop.value_range.max_) self._prop_brightness = prop elif ( - self._mode_list is None - and isinstance(prop.value_list, list) + self._mode_map is None and prop.value_list ): # For value-list brightness - self._mode_list = { - item['value']: item['description'] - for item in prop.value_list} - self._attr_effect_list = list(self._mode_list.values()) + self._mode_map = prop.value_list.to_map() + self._attr_effect_list = list(self._mode_map.values()) self._attr_supported_features |= LightEntityFeature.EFFECT self._prop_mode = prop else: @@ -153,13 +150,13 @@ class Light(MIoTServiceEntity, LightEntity): continue # color-temperature if prop.name == 'color-temperature': - if not isinstance(prop.value_range, dict): + if not prop.value_range: _LOGGER.info( 'invalid color-temperature value_range format, %s', self.entity_id) continue - self._attr_min_color_temp_kelvin = prop.value_range['min'] - self._attr_max_color_temp_kelvin = prop.value_range['max'] + self._attr_min_color_temp_kelvin = prop.value_range.min_ + self._attr_max_color_temp_kelvin = prop.value_range.max_ self._attr_supported_color_modes.add(ColorMode.COLOR_TEMP) self._attr_color_mode = ColorMode.COLOR_TEMP self._prop_color_temp = prop @@ -171,20 +168,15 @@ class Light(MIoTServiceEntity, LightEntity): # mode if prop.name == 'mode': mode_list = None - if ( - isinstance(prop.value_list, list) - and prop.value_list - ): - mode_list = { - item['value']: item['description'] - for item in prop.value_list} - elif isinstance(prop.value_range, dict): + if prop.value_list: + mode_list = prop.value_list.to_map() + elif prop.value_range: mode_list = {} if ( int(( - prop.value_range['max'] - - prop.value_range['min'] - ) / prop.value_range['step']) + prop.value_range.max_ + - prop.value_range.min_ + ) / prop.value_range.step) > self._VALUE_RANGE_MODE_COUNT_MAX ): _LOGGER.info( @@ -192,13 +184,13 @@ class Light(MIoTServiceEntity, LightEntity): self.entity_id, prop.name, prop.value_range) else: for value in range( - prop.value_range['min'], - prop.value_range['max'], - prop.value_range['step']): + prop.value_range.min_, + prop.value_range.max_, + prop.value_range.step): mode_list[value] = f'mode {value}' if mode_list: - self._mode_list = mode_list - self._attr_effect_list = list(self._mode_list.values()) + self._mode_map = mode_list + self._attr_effect_list = list(self._mode_map.values()) self._attr_supported_features |= LightEntityFeature.EFFECT self._prop_mode = prop else: @@ -213,21 +205,6 @@ class Light(MIoTServiceEntity, LightEntity): self._attr_supported_color_modes.add(ColorMode.ONOFF) self._attr_color_mode = ColorMode.ONOFF - def __get_mode_description(self, key: int) -> Optional[str]: - """Convert mode value to description.""" - if self._mode_list is None: - return None - return self._mode_list.get(key, None) - - def __get_mode_value(self, description: str) -> Optional[int]: - """Convert mode description to value.""" - if self._mode_list is None: - return None - for key, value in self._mode_list.items(): - if value == description: - return key - return None - @property def is_on(self) -> Optional[bool]: """Return if the light is on.""" @@ -264,7 +241,8 @@ class Light(MIoTServiceEntity, LightEntity): @property def effect(self) -> Optional[str]: """Return the current mode.""" - return self.__get_mode_description( + return self.get_map_value( + map_=self._mode_map, key=self.get_prop_value(prop=self._prop_mode)) async def async_turn_on(self, **kwargs) -> None: @@ -275,7 +253,7 @@ class Light(MIoTServiceEntity, LightEntity): result: bool = False # on # Dirty logic for lumi.gateway.mgl03 indicator light - value_on = True if self._prop_on.format_ == 'bool' else 1 + value_on = True if self._prop_on.format_ == bool else 1 result = await self.set_property_async( prop=self._prop_on, value=value_on) # brightness @@ -303,11 +281,12 @@ class Light(MIoTServiceEntity, LightEntity): if ATTR_EFFECT in kwargs: result = await self.set_property_async( prop=self._prop_mode, - value=self.__get_mode_value(description=kwargs[ATTR_EFFECT])) + value=self.get_map_key( + map_=self._mode_map, value=kwargs[ATTR_EFFECT])) return result async def async_turn_off(self, **kwargs) -> None: """Turn the light off.""" # Dirty logic for lumi.gateway.mgl03 indicator light - value_on = False if self._prop_on.format_ == 'bool' else 0 + value_on = False if self._prop_on.format_ == bool else 0 return await self.set_property_async(prop=self._prop_on, value=value_on) diff --git a/custom_components/xiaomi_home/miot/common.py b/custom_components/xiaomi_home/miot/common.py index 0ee4f1d..dec21c3 100644 --- a/custom_components/xiaomi_home/miot/common.py +++ b/custom_components/xiaomi_home/miot/common.py @@ -45,11 +45,14 @@ off Xiaomi or its affiliates' products. Common utilities. """ +import asyncio import json from os import path import random from typing import Any, Optional import hashlib +from urllib.parse import urlencode +from urllib.request import Request, urlopen from paho.mqtt.matcher import MQTTMatcher import yaml @@ -83,10 +86,12 @@ def randomize_int(value: int, ratio: float) -> int: """Randomize an integer value.""" return int(value * (1 - ratio + random.random()*2*ratio)) + def randomize_float(value: float, ratio: float) -> float: """Randomize a float value.""" return value * (1 - ratio + random.random()*2*ratio) + class MIoTMatcher(MQTTMatcher): """MIoT Pub/Sub topic matcher.""" @@ -105,3 +110,68 @@ class MIoTMatcher(MQTTMatcher): return self[topic] except KeyError: return None + + +class MIoTHttp: + """MIoT Common HTTP API.""" + @staticmethod + def get( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[str]: + full_url = url + if params: + encoded_params = urlencode(params) + full_url = f'{url}?{encoded_params}' + request = Request(full_url, method='GET', headers=headers or {}) + content: Optional[bytes] = None + with urlopen(request) as response: + content = response.read() + return str(content, 'utf-8') if content else None + + @staticmethod + def get_json( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[dict]: + response = MIoTHttp.get(url, params, headers) + return json.loads(response) if response else None + + @staticmethod + def post( + url: str, data: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[str]: + pass + + @staticmethod + def post_json( + url: str, data: Optional[dict] = None, headers: Optional[dict] = None + ) -> Optional[dict]: + response = MIoTHttp.post(url, data, headers) + return json.loads(response) if response else None + + @staticmethod + async def get_async( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Optional[str]: + # TODO: Use aiohttp + ev_loop = loop or asyncio.get_running_loop() + return await ev_loop.run_in_executor( + None, MIoTHttp.get, url, params, headers) + + @staticmethod + async def get_json_async( + url: str, params: Optional[dict] = None, headers: Optional[dict] = None, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Optional[dict]: + ev_loop = loop or asyncio.get_running_loop() + return await ev_loop.run_in_executor( + None, MIoTHttp.get_json, url, params, headers) + + @ staticmethod + async def post_async( + url: str, data: Optional[dict] = None, headers: Optional[dict] = None, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> Optional[str]: + ev_loop = loop or asyncio.get_running_loop() + return await ev_loop.run_in_executor( + None, MIoTHttp.post, url, data, headers) diff --git a/custom_components/xiaomi_home/miot/miot_cloud.py b/custom_components/xiaomi_home/miot/miot_cloud.py index 4c076fe..bb26f37 100644 --- a/custom_components/xiaomi_home/miot/miot_cloud.py +++ b/custom_components/xiaomi_home/miot/miot_cloud.py @@ -735,7 +735,7 @@ class MIoTHttpClient: prop_obj['fut'].set_result(None) if props_req: _LOGGER.info( - 'get prop from cloud failed, %s, %s', len(key), props_req) + 'get prop from cloud failed, %s', props_req) if self._get_prop_list: self._get_prop_timer = self._main_loop.call_later( diff --git a/custom_components/xiaomi_home/miot/miot_device.py b/custom_components/xiaomi_home/miot/miot_device.py index 353b28f..f3a9361 100644 --- a/custom_components/xiaomi_home/miot/miot_device.py +++ b/custom_components/xiaomi_home/miot/miot_device.py @@ -94,7 +94,9 @@ from .miot_spec import ( MIoTSpecEvent, MIoTSpecInstance, MIoTSpecProperty, - MIoTSpecService + MIoTSpecService, + MIoTSpecValueList, + MIoTSpecValueRange ) _LOGGER = logging.getLogger(__name__) @@ -142,7 +144,7 @@ class MIoTDevice: _room_id: str _room_name: str - _suggested_area: str + _suggested_area: Optional[str] _device_state_sub_list: dict[str, Callable[[str, MIoTDeviceState], None]] @@ -153,7 +155,7 @@ class MIoTDevice: def __init__( self, miot_client: MIoTClient, - device_info: dict[str, str], + device_info: dict[str, Any], spec_instance: MIoTSpecInstance ) -> None: self.miot_client = miot_client @@ -243,25 +245,29 @@ class MIoTDevice: return True def sub_property( - self, handler: Callable[[dict, Any], None], siid: int = None, - piid: int = None, handler_ctx: Any = None + self, handler: Callable[[dict, Any], None], siid: Optional[int] = None, + piid: Optional[int] = None, handler_ctx: Any = None ) -> bool: return self.miot_client.sub_prop( did=self._did, handler=handler, siid=siid, piid=piid, handler_ctx=handler_ctx) - def unsub_property(self, siid: int = None, piid: int = None) -> bool: + def unsub_property( + self, siid: Optional[int] = None, piid: Optional[int] = None + ) -> bool: return self.miot_client.unsub_prop(did=self._did, siid=siid, piid=piid) def sub_event( - self, handler: Callable[[dict, Any], None], siid: int = None, - eiid: int = None, handler_ctx: Any = None + self, handler: Callable[[dict, Any], None], siid: Optional[int] = None, + eiid: Optional[int] = None, handler_ctx: Any = None ) -> bool: return self.miot_client.sub_event( did=self._did, handler=handler, siid=siid, eiid=eiid, handler_ctx=handler_ctx) - def unsub_event(self, siid: int = None, eiid: int = None) -> bool: + def unsub_event( + self, siid: Optional[int] = None, eiid: Optional[int] = None + ) -> bool: return self.miot_client.unsub_event( did=self._did, siid=siid, eiid=eiid) @@ -507,7 +513,7 @@ class MIoTDevice: if prop_access != (SPEC_PROP_TRANS_MAP[ 'entities'][platform]['access']): return None - if prop.format_ not in SPEC_PROP_TRANS_MAP[ + if prop.format_.__name__ not in SPEC_PROP_TRANS_MAP[ 'entities'][platform]['format']: return None if prop.unit: @@ -560,9 +566,9 @@ class MIoTDevice: # general conversion if not prop.platform: if prop.writable: - if prop.format_ == 'str': + if prop.format_ == str: prop.platform = 'text' - elif prop.format_ == 'bool': + elif prop.format_ == bool: prop.platform = 'switch' prop.device_class = SwitchDeviceClass.SWITCH elif prop.value_list: @@ -703,7 +709,7 @@ class MIoTDevice: def __on_device_state_changed( self, did: str, state: MIoTDeviceState, ctx: Any ) -> None: - self._online = state + self._online = state == MIoTDeviceState.ONLINE for key, handler in self._device_state_sub_list.items(): self.miot_client.main_loop.call_soon_threadsafe( handler, key, state) @@ -719,7 +725,8 @@ class MIoTServiceEntity(Entity): _main_loop: asyncio.AbstractEventLoop _prop_value_map: dict[MIoTSpecProperty, Any] - _event_occurred_handler: Callable[[MIoTSpecEvent, dict], None] + _event_occurred_handler: Optional[ + Callable[[MIoTSpecEvent, dict], None]] _prop_changed_subs: dict[ MIoTSpecProperty, Callable[[MIoTSpecProperty, Any], None]] @@ -763,7 +770,9 @@ class MIoTServiceEntity(Entity): self.entity_id) @property - def event_occurred_handler(self) -> Callable[[MIoTSpecEvent, dict], None]: + def event_occurred_handler( + self + ) -> Optional[Callable[[MIoTSpecEvent, dict], None]]: return self._event_occurred_handler @event_occurred_handler.setter @@ -784,7 +793,7 @@ class MIoTServiceEntity(Entity): self._prop_changed_subs.pop(prop, None) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -829,18 +838,20 @@ class MIoTServiceEntity(Entity): self.miot_device.unsub_event( siid=event.service.iid, eiid=event.iid) - def get_map_description(self, map_: dict[int, Any], key: int) -> Any: + def get_map_value( + self, map_: dict[int, Any], key: int + ) -> Any: if map_ is None: return None return map_.get(key, None) - def get_map_value( - self, map_: dict[int, Any], description: Any + def get_map_key( + self, map_: dict[int, Any], value: Any ) -> Optional[int]: if map_ is None: return None - for key, value in map_.items(): - if value == description: + for key, value_ in map_.items(): + if value_ == value: return key return None @@ -999,10 +1010,9 @@ class MIoTPropertyEntity(Entity): service: MIoTSpecService _main_loop: asyncio.AbstractEventLoop - # {'min':int, 'max':int, 'step': int} - _value_range: dict[str, int] + _value_range: Optional[MIoTSpecValueRange] # {Any: Any} - _value_list: dict[Any, Any] + _value_list: Optional[MIoTSpecValueList] _value: Any _pending_write_ha_state_timer: Optional[asyncio.TimerHandle] @@ -1015,11 +1025,7 @@ class MIoTPropertyEntity(Entity): self.service = spec.service self._main_loop = miot_device.miot_client.main_loop self._value_range = spec.value_range - if spec.value_list: - self._value_list = { - item['value']: item['description'] for item in spec.value_list} - else: - self._value_list = None + self._value_list = spec.value_list self._value = None self._pending_write_ha_state_timer = None # Gen entity_id @@ -1042,7 +1048,7 @@ class MIoTPropertyEntity(Entity): self._value_list) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1067,18 +1073,15 @@ class MIoTPropertyEntity(Entity): self.miot_device.unsub_property( siid=self.service.iid, piid=self.spec.iid) - def get_vlist_description(self, value: Any) -> str: + def get_vlist_description(self, value: Any) -> Optional[str]: if not self._value_list: return None - return self._value_list.get(value, None) + return self._value_list.get_description_by_value(value) def get_vlist_value(self, description: str) -> Any: if not self._value_list: return None - for key, value in self._value_list.items(): - if value == description: - return key - return None + return self._value_list.get_value_by_description(description) async def set_property_async(self, value: Any) -> bool: if not self.spec.writable: @@ -1184,7 +1187,7 @@ class MIoTEventEntity(Entity): spec.device_class, self.entity_id) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1286,7 +1289,7 @@ class MIoTActionEntity(Entity): spec.device_class, self.entity_id) @property - def device_info(self) -> dict: + def device_info(self) -> Optional[DeviceInfo]: return self.miot_device.device_info async def async_added_to_hass(self) -> None: @@ -1298,7 +1301,9 @@ class MIoTActionEntity(Entity): self.miot_device.unsub_device_state( key=f'{self.action_platform}.{ self.service.iid}.{self.spec.iid}') - async def action_async(self, in_list: list = None) -> Optional[list]: + async def action_async( + self, in_list: Optional[list] = None + ) -> Optional[list]: try: return await self.miot_device.miot_client.action_async( did=self.miot_device.did, diff --git a/custom_components/xiaomi_home/miot/miot_spec.py b/custom_components/xiaomi_home/miot/miot_spec.py index 33022a1..5eaac98 100644 --- a/custom_components/xiaomi_home/miot/miot_spec.py +++ b/custom_components/xiaomi_home/miot/miot_spec.py @@ -46,342 +46,200 @@ off Xiaomi or its affiliates' products. MIoT-Spec-V2 parser. """ import asyncio -import json import platform import time -from typing import Any, Optional -from urllib.parse import urlencode -from urllib.request import Request, urlopen +from typing import Any, Optional, Type, Union import logging +from slugify import slugify + # pylint: disable=relative-beyond-top-level from .const import DEFAULT_INTEGRATION_LANGUAGE, SPEC_STD_LIB_EFFECTIVE_TIME +from .common import MIoTHttp from .miot_error import MIoTSpecError from .miot_storage import ( MIoTStorage, SpecBoolTranslation, - SpecFilter, - SpecMultiLang) + SpecFilter) _LOGGER = logging.getLogger(__name__) -class MIoTSpecBase: - """MIoT SPEC base class.""" - iid: int - type_: str - description: str - description_trans: Optional[str] - proprietary: bool - need_filter: bool - name: Optional[str] +class MIoTSpecValueRange: + """MIoT SPEC value range class.""" + min_: int + max_: int + step: int - # External params - platform: str - device_class: Any - state_class: Any - icon: str - external_unit: Any + def __init__(self, value_range: Union[dict, list]) -> None: + if isinstance(value_range, dict): + self.load(value_range) + elif isinstance(value_range, list): + self.from_spec(value_range) + else: + raise MIoTSpecError('invalid value range format') - spec_id: str + def load(self, value_range: dict) -> None: + if ( + 'min' not in value_range + or 'max' not in value_range + or 'step' not in value_range + ): + raise MIoTSpecError('invalid value range') + self.min_ = value_range['min'] + self.max_ = value_range['max'] + self.step = value_range['step'] - def __init__(self, spec: dict) -> None: - self.iid = spec['iid'] - self.type_ = spec['type'] - self.description = spec['description'] - - self.description_trans = spec.get('description_trans', None) - self.proprietary = spec.get('proprietary', False) - self.need_filter = spec.get('need_filter', False) - self.name = spec.get('name', None) - - self.platform = None - self.device_class = None - self.state_class = None - self.icon = None - self.external_unit = None - - self.spec_id = hash(f'{self.type_}.{self.iid}') - - def __hash__(self) -> int: - return self.spec_id - - def __eq__(self, value: object) -> bool: - return self.spec_id == value.spec_id - - -class MIoTSpecProperty(MIoTSpecBase): - """MIoT SPEC property class.""" - format_: str - precision: int - unit: str - - value_range: list - value_list: list[dict] - - _access: list - _writable: bool - _readable: bool - _notifiable: bool - - service: MIoTSpecBase - - def __init__( - self, spec: dict, service: MIoTSpecBase = None, - format_: str = None, access: list = None, - unit: str = None, value_range: list = None, - value_list: list[dict] = None, precision: int = 0 - ) -> None: - super().__init__(spec=spec) - self.service = service - self.format_ = format_ - self.access = access - self.unit = unit - self.value_range = value_range - self.value_list = value_list - self.precision = precision - - self.spec_id = hash( - f'p.{self.name}.{self.service.iid}.{self.iid}') - - @property - def access(self) -> list: - return self._access - - @access.setter - def access(self, value: list) -> None: - self._access = value - if isinstance(value, list): - self._writable = 'write' in value - self._readable = 'read' in value - self._notifiable = 'notify' in value - - @property - def writable(self) -> bool: - return self._writable - - @property - def readable(self) -> bool: - return self._readable - - @property - def notifiable(self): - return self._notifiable - - def value_format(self, value: Any) -> Any: - if value is None: - return None - if self.format_ == 'int': - return int(value) - if self.format_ == 'float': - return round(value, self.precision) - if self.format_ == 'bool': - return bool(value in [True, 1, 'true', '1']) - return value + def from_spec(self, value_range: list) -> None: + if len(value_range) != 3: + raise MIoTSpecError('invalid value range') + self.min_ = value_range[0] + self.max_ = value_range[1] + self.step = value_range[2] def dump(self) -> dict: return { - 'type': self.type_, - 'name': self.name, - 'iid': self.iid, - 'description': self.description, - 'description_trans': self.description_trans, - 'proprietary': self.proprietary, - 'need_filter': self.need_filter, - 'format': self.format_, - 'access': self._access, - 'unit': self.unit, - 'value_range': self.value_range, - 'value_list': self.value_list, - 'precision': self.precision + 'min': self.min_, + 'max': self.max_, + 'step': self.step } - -class MIoTSpecEvent(MIoTSpecBase): - """MIoT SPEC event class.""" - argument: list[MIoTSpecProperty] - service: MIoTSpecBase - - def __init__( - self, spec: dict, service: MIoTSpecBase = None, - argument: list[MIoTSpecProperty] = None - ) -> None: - super().__init__(spec=spec) - self.argument = argument - self.service = service - - self.spec_id = hash( - f'e.{self.name}.{self.service.iid}.{self.iid}') - - def dump(self) -> dict: - return { - 'type': self.type_, - 'name': self.name, - 'iid': self.iid, - 'description': self.description, - 'description_trans': self.description_trans, - 'proprietary': self.proprietary, - 'need_filter': self.need_filter, - 'argument': [prop.iid for prop in self.argument], - } + def __str__(self) -> str: + return f'[{self.min_}, {self.max_}, {self.step}' -class MIoTSpecAction(MIoTSpecBase): - """MIoT SPEC action class.""" - in_: list[MIoTSpecProperty] - out: list[MIoTSpecProperty] - service: MIoTSpecBase - - def __init__( - self, spec: dict, service: MIoTSpecBase = None, - in_: list[MIoTSpecProperty] = None, - out: list[MIoTSpecProperty] = None - ) -> None: - super().__init__(spec=spec) - self.in_ = in_ - self.out = out - self.service = service - - self.spec_id = hash( - f'a.{self.name}.{self.service.iid}.{self.iid}') - - def dump(self) -> dict: - return { - 'type': self.type_, - 'name': self.name, - 'iid': self.iid, - 'description': self.description, - 'description_trans': self.description_trans, - 'proprietary': self.proprietary, - 'need_filter': self.need_filter, - 'in': [prop.iid for prop in self.in_], - 'out': [prop.iid for prop in self.out] - } - - -class MIoTSpecService(MIoTSpecBase): - """MIoT SPEC service class.""" - properties: list[MIoTSpecProperty] - events: list[MIoTSpecEvent] - actions: list[MIoTSpecAction] - - def __init__(self, spec: dict) -> None: - super().__init__(spec=spec) - self.properties = [] - self.events = [] - self.actions = [] - - def dump(self) -> dict: - return { - 'type': self.type_, - 'name': self.name, - 'iid': self.iid, - 'description': self.description, - 'description_trans': self.description_trans, - 'proprietary': self.proprietary, - 'properties': [prop.dump() for prop in self.properties], - 'need_filter': self.need_filter, - 'events': [event.dump() for event in self.events], - 'actions': [action.dump() for action in self.actions], - } - - -# @dataclass -class MIoTSpecInstance: - """MIoT SPEC instance class.""" - urn: str +class MIoTSpecValueListItem: + """MIoT SPEC value list item class.""" + # NOTICE: bool type without name name: str - # urn_name: str + # Value + value: Any + # Descriptions after multilingual conversion. description: str - description_trans: str - services: list[MIoTSpecService] - # External params - platform: str - device_class: Any - icon: str + def __init__(self, item: dict) -> None: + self.load(item) - def __init__( - self, urn: str = None, name: str = None, - description: str = None, description_trans: str = None - ) -> None: - self.urn = urn - self.name = name - self.description = description - self.description_trans = description_trans - self.services = [] + def load(self, item: dict) -> None: + if 'value' not in item or 'description' not in item: + raise MIoTSpecError('invalid value list item, %s') - def load(self, specs: dict) -> 'MIoTSpecInstance': - self.urn = specs['urn'] - self.name = specs['name'] - self.description = specs['description'] - self.description_trans = specs['description_trans'] - self.services = [] - for service in specs['services']: - spec_service = MIoTSpecService(spec=service) - for prop in service['properties']: - spec_prop = MIoTSpecProperty( - spec=prop, - service=spec_service, - format_=prop['format'], - access=prop['access'], - unit=prop['unit'], - value_range=prop['value_range'], - value_list=prop['value_list'], - precision=prop.get('precision', 0)) - spec_service.properties.append(spec_prop) - for event in service['events']: - spec_event = MIoTSpecEvent( - spec=event, service=spec_service) - arg_list: list[MIoTSpecProperty] = [] - for piid in event['argument']: - for prop in spec_service.properties: - if prop.iid == piid: - arg_list.append(prop) - break - spec_event.argument = arg_list - spec_service.events.append(spec_event) - for action in service['actions']: - spec_action = MIoTSpecAction( - spec=action, service=spec_service, in_=action['in']) - in_list: list[MIoTSpecProperty] = [] - for piid in action['in']: - for prop in spec_service.properties: - if prop.iid == piid: - in_list.append(prop) - break - spec_action.in_ = in_list - out_list: list[MIoTSpecProperty] = [] - for piid in action['out']: - for prop in spec_service.properties: - if prop.iid == piid: - out_list.append(prop) - break - spec_action.out = out_list - spec_service.actions.append(spec_action) - self.services.append(spec_service) - return self + self.name = item.get('name', None) + self.value = item['value'] + self.description = item['description'] + + @staticmethod + def from_spec(item: dict) -> 'MIoTSpecValueListItem': + if ( + 'name' not in item + or 'value' not in item + or 'description' not in item + ): + raise MIoTSpecError('invalid value list item, %s') + # Slugify name and convert to lower-case. + cache = { + 'name': slugify(text=item['name'], separator='_').lower(), + 'value': item['value'], + 'description': item['description'] + } + return MIoTSpecValueListItem(cache) def dump(self) -> dict: return { - 'urn': self.urn, 'name': self.name, - 'description': self.description, - 'description_trans': self.description_trans, - 'services': [service.dump() for service in self.services] + 'value': self.value, + 'description': self.description } + def __str__(self) -> str: + return f'{self.name}: {self.value} - {self.description}' -class SpecStdLib: + +class MIoTSpecValueList: + """MIoT SPEC value list class.""" + items: list[MIoTSpecValueListItem] + + def __init__(self, value_list: list[dict]) -> None: + if not isinstance(value_list, list): + raise MIoTSpecError('invalid value list format') + self.items = [] + self.load(value_list) + + @property + def names(self) -> list[str]: + return [item.name for item in self.items] + + @property + def values(self) -> list[Any]: + return [item.value for item in self.items] + + @property + def descriptions(self) -> list[str]: + return [item.description for item in self.items] + + @staticmethod + def from_spec(value_list: list[dict]) -> 'MIoTSpecValueList': + result = MIoTSpecValueList([]) + dup_desc: dict[str, int] = {} + for item in value_list: + # Handle duplicate descriptions. + count = 0 + if item['description'] in dup_desc: + count = dup_desc[item['description']] + count += 1 + dup_desc[item['description']] = count + if count > 1: + item['name'] = f'{item["name"]}_{count}' + item['description'] = f'{item["description"]}_{count}' + + result.items.append(MIoTSpecValueListItem.from_spec(item)) + return result + + def load(self, value_list: list[dict]) -> None: + for item in value_list: + self.items.append(MIoTSpecValueListItem(item)) + + def to_map(self) -> dict: + return {item.value: item.description for item in self.items} + + def get_value_by_description(self, description: str) -> Any: + for item in self.items: + if item.description == description: + return item.value + return None + + def get_description_by_value(self, value: Any) -> Optional[str]: + for item in self.items: + if item.value == value: + return item.description + return None + + def dump(self) -> list: + return [item.dump() for item in self.items] + + +class _SpecStdLib: """MIoT-Spec-V2 standard library.""" _lang: str - _spec_std_lib: Optional[dict[str, dict[str, dict[str, str]]]] + _devices: dict[str, dict[str, str]] + _services: dict[str, dict[str, str]] + _properties: dict[str, dict[str, str]] + _events: dict[str, dict[str, str]] + _actions: dict[str, dict[str, str]] + _values: dict[str, dict[str, str]] def __init__(self, lang: str) -> None: self._lang = lang + self._devices = {} + self._services = {} + self._properties = {} + self._events = {} + self._actions = {} + self._values = {} + self._spec_std_lib = None - def init(self, std_lib: dict[str, dict[str, str]]) -> None: + def load(self, std_lib: dict[str, dict[str, dict[str, str]]]) -> None: if ( not isinstance(std_lib, dict) or 'devices' not in std_lib @@ -392,246 +250,80 @@ class SpecStdLib: or 'values' not in std_lib ): return - self._spec_std_lib = std_lib - - def deinit(self) -> None: - self._spec_std_lib = None + self._devices = std_lib['devices'] + self._services = std_lib['services'] + self._properties = std_lib['properties'] + self._events = std_lib['events'] + self._actions = std_lib['actions'] + self._values = std_lib['values'] def device_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['devices']: + if not self._devices or key not in self._devices: return None - if self._lang not in self._spec_std_lib['devices'][key]: - return self._spec_std_lib['devices'][key].get( + if self._lang not in self._devices[key]: + return self._devices[key].get( DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['devices'][key][self._lang] + return self._devices[key][self._lang] def service_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['services']: + if not self._services or key not in self._services: return None - if self._lang not in self._spec_std_lib['services'][key]: - return self._spec_std_lib['services'][key].get( + if self._lang not in self._services[key]: + return self._services[key].get( DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['services'][key][self._lang] + return self._services[key][self._lang] def property_translate(self, key: str) -> Optional[str]: - if ( - not self._spec_std_lib - or key not in self._spec_std_lib['properties'] - ): + if not self._properties or key not in self._properties: return None - if self._lang not in self._spec_std_lib['properties'][key]: - return self._spec_std_lib['properties'][key].get( + if self._lang not in self._properties[key]: + return self._properties[key].get( DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['properties'][key][self._lang] + return self._properties[key][self._lang] def event_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['events']: + if not self._events or key not in self._events: return None - if self._lang not in self._spec_std_lib['events'][key]: - return self._spec_std_lib['events'][key].get( + if self._lang not in self._events[key]: + return self._events[key].get( DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['events'][key][self._lang] + return self._events[key][self._lang] def action_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['actions']: + if not self._actions or key not in self._actions: return None - if self._lang not in self._spec_std_lib['actions'][key]: - return self._spec_std_lib['actions'][key].get( + if self._lang not in self._actions[key]: + return self._actions[key].get( DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['actions'][key][self._lang] + return self._actions[key][self._lang] def value_translate(self, key: str) -> Optional[str]: - if not self._spec_std_lib or key not in self._spec_std_lib['values']: + if not self._values or key not in self._values: return None - if self._lang not in self._spec_std_lib['values'][key]: - return self._spec_std_lib['values'][key].get( + if self._lang not in self._values[key]: + return self._values[key].get( DEFAULT_INTEGRATION_LANGUAGE, None) - return self._spec_std_lib['values'][key][self._lang] + return self._values[key][self._lang] - def dump(self) -> dict[str, dict[str, str]]: - return self._spec_std_lib + def dump(self) -> dict[str, dict[str, dict[str, str]]]: + return { + 'devices': self._devices, + 'services': self._services, + 'properties': self._properties, + 'events': self._events, + 'actions': self._actions, + 'values': self._values + } + async def refresh_async(self) -> bool: + std_lib_new = await self.__request_from_cloud_async() + if std_lib_new: + self.load(std_lib_new) + return True + return False -class MIoTSpecParser: - """MIoT SPEC parser.""" - # pylint: disable=inconsistent-quotes - VERSION: int = 1 - DOMAIN: str = 'miot_specs' - _lang: str - _storage: MIoTStorage - _main_loop: asyncio.AbstractEventLoop - - _init_done: bool - _ram_cache: dict - - _std_lib: SpecStdLib - _bool_trans: SpecBoolTranslation - _multi_lang: SpecMultiLang - _spec_filter: SpecFilter - - def __init__( - self, lang: str = DEFAULT_INTEGRATION_LANGUAGE, - storage: MIoTStorage = None, - loop: Optional[asyncio.AbstractEventLoop] = None - ) -> None: - self._lang = lang - self._storage = storage - self._main_loop = loop or asyncio.get_running_loop() - - self._init_done = False - self._ram_cache = {} - - self._std_lib = SpecStdLib(lang=self._lang) - self._bool_trans = SpecBoolTranslation( - lang=self._lang, loop=self._main_loop) - self._multi_lang = SpecMultiLang(lang=self._lang, loop=self._main_loop) - self._spec_filter = SpecFilter(loop=self._main_loop) - - async def init_async(self) -> None: - if self._init_done is True: - return - await self._bool_trans.init_async() - await self._multi_lang.init_async() - await self._spec_filter.init_async() - std_lib_cache: dict = None - if self._storage: - std_lib_cache: dict = await self._storage.load_async( - domain=self.DOMAIN, name='spec_std_lib', type_=dict) - if ( - isinstance(std_lib_cache, dict) - and 'data' in std_lib_cache - and 'ts' in std_lib_cache - and isinstance(std_lib_cache['ts'], int) - and int(time.time()) - std_lib_cache['ts'] < - SPEC_STD_LIB_EFFECTIVE_TIME - ): - # Use the cache if the update time is less than 14 day - _LOGGER.debug( - 'use local spec std cache, ts->%s', std_lib_cache['ts']) - self._std_lib.init(std_lib_cache['data']) - self._init_done = True - return - # Update spec std lib - spec_lib_new = await self.__request_spec_std_lib_async() - if spec_lib_new: - self._std_lib.init(spec_lib_new) - if self._storage: - if not await self._storage.save_async( - domain=self.DOMAIN, name='spec_std_lib', - data={ - 'data': self._std_lib.dump(), - 'ts': int(time.time()) - } - ): - _LOGGER.error('save spec std lib failed') - else: - if std_lib_cache: - self._std_lib.init(std_lib_cache['data']) - _LOGGER.error('get spec std lib failed, use local cache') - else: - _LOGGER.error('get spec std lib failed') - self._init_done = True - - async def deinit_async(self) -> None: - self._init_done = False - self._std_lib.deinit() - await self._bool_trans.deinit_async() - await self._multi_lang.deinit_async() - await self._spec_filter.deinit_async() - self._ram_cache.clear() - - async def parse( - self, urn: str, skip_cache: bool = False, - ) -> MIoTSpecInstance: - """MUST await init first !!!""" - if not skip_cache: - cache_result = await self.__cache_get(urn=urn) - if isinstance(cache_result, dict): - _LOGGER.debug('get from cache, %s', urn) - return MIoTSpecInstance().load(specs=cache_result) - # Retry three times - for index in range(3): - try: - return await self.__parse(urn=urn) - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error( - 'parse error, retry, %d, %s, %s', index, urn, err) - return None - - async def refresh_async(self, urn_list: list[str]) -> int: - """MUST await init first !!!""" - if not urn_list: - return False - spec_std_new: dict = await self.__request_spec_std_lib_async() - if spec_std_new: - self._std_lib.init(spec_std_new) - if self._storage: - if not await self._storage.save_async( - domain=self.DOMAIN, name='spec_std_lib', - data={ - 'data': self._std_lib.dump(), - 'ts': int(time.time()) - } - ): - _LOGGER.error('save spec std lib failed') - else: - raise MIoTSpecError('get spec std lib failed') - success_count = 0 - for index in range(0, len(urn_list), 5): - batch = urn_list[index:index+5] - task_list = [self._main_loop.create_task( - self.parse(urn=urn, skip_cache=True)) for urn in batch] - results = await asyncio.gather(*task_list) - success_count += sum(1 for result in results if result is not None) - return success_count - - def __http_get( - self, url: str, params: dict = None, headers: dict = None - ) -> dict: - if params: - encoded_params = urlencode(params) - full_url = f'{url}?{encoded_params}' - else: - full_url = url - request = Request(full_url, method='GET', headers=headers or {}) - content: bytes = None - with urlopen(request) as response: - content = response.read() - return ( - json.loads(str(content, 'utf-8')) - if content is not None else None) - - async def __http_get_async( - self, url: str, params: dict = None, headers: dict = None - ) -> dict: - return await self._main_loop.run_in_executor( - None, self.__http_get, url, params, headers) - - async def __cache_get(self, urn: str) -> Optional[dict]: - if self._storage is not None: - if platform.system() == 'Windows': - urn = urn.replace(':', '_') - return await self._storage.load_async( - domain=self.DOMAIN, name=f'{urn}_{self._lang}', type_=dict) - return self._ram_cache.get(urn, None) - - async def __cache_set(self, urn: str, data: dict) -> bool: - if self._storage is not None: - if platform.system() == 'Windows': - urn = urn.replace(':', '_') - return await self._storage.save_async( - domain=self.DOMAIN, name=f'{urn}_{self._lang}', data=data) - self._ram_cache[urn] = data - return True - - def __spec_format2dtype(self, format_: str) -> str: - # 'string'|'bool'|'uint8'|'uint16'|'uint32'| - # 'int8'|'int16'|'int32'|'int64'|'float' - return {'string': 'str', 'bool': 'bool', 'float': 'float'}.get( - format_, 'int') - - async def __request_spec_std_lib_async(self) -> Optional[SpecStdLib]: - std_libs: dict = None + async def __request_from_cloud_async(self) -> Optional[dict]: + std_libs: Optional[dict] = None for index in range(3): try: tasks: list = [] @@ -659,7 +351,7 @@ class MIoTSpecParser: for name in [ 'device', 'service', 'property', 'event', 'action', 'property_value']: - tasks.append(self.__http_get_async( + tasks.append(MIoTHttp.get_json_async( 'https://cdn.cnbj1.fds.api.mi-img.com/res-conf/' f'xiaomi-home/std_ex_{name}.json')) results = await asyncio.gather(*tasks) @@ -719,7 +411,7 @@ class MIoTSpecParser: return None async def __get_property_value(self) -> dict: - reply = await self.__http_get_async( + reply = await MIoTHttp.get_json_async( url='https://miot-spec.org/miot-spec-v2' '/normalization/list/property_value') if reply is None or 'result' not in reply: @@ -743,7 +435,7 @@ class MIoTSpecParser: return result async def __get_template_list(self, url: str) -> dict: - reply = await self.__http_get_async(url=url) + reply = await MIoTHttp.get_json_async(url=url) if reply is None or 'result' not in reply: raise MIoTSpecError(f'get service failed, {url}') result: dict = {} @@ -767,20 +459,617 @@ class MIoTSpecParser: result[item['type']] = item['description'] return result - async def __get_instance(self, urn: str) -> dict: - return await self.__http_get_async( - url='https://miot-spec.org/miot-spec-v2/instance', - params={'type': urn}) - async def __get_translation(self, urn: str) -> dict: - return await self.__http_get_async( +class _MIoTSpecBase: + """MIoT SPEC base class.""" + iid: int + type_: str + description: str + description_trans: Optional[str] + proprietary: bool + need_filter: bool + name: Optional[str] + + # External params + platform: Optional[str] + device_class: Any + state_class: Any + icon: Optional[str] + external_unit: Any + + spec_id: int + + def __init__(self, spec: dict) -> None: + self.iid = spec['iid'] + self.type_ = spec['type'] + self.description = spec['description'] + + self.description_trans = spec.get('description_trans', None) + self.proprietary = spec.get('proprietary', False) + self.need_filter = spec.get('need_filter', False) + self.name = spec.get('name', None) + + self.platform = None + self.device_class = None + self.state_class = None + self.icon = None + self.external_unit = None + + self.spec_id = hash(f'{self.type_}.{self.iid}') + + def __hash__(self) -> int: + return self.spec_id + + def __eq__(self, value) -> bool: + return self.spec_id == value.spec_id + + +class MIoTSpecProperty(_MIoTSpecBase): + """MIoT SPEC property class.""" + unit: Optional[str] + precision: int + + _format_: Type + _value_range: Optional[MIoTSpecValueRange] + _value_list: Optional[MIoTSpecValueList] + + _access: list + _writable: bool + _readable: bool + _notifiable: bool + + service: 'MIoTSpecService' + + def __init__( + self, + spec: dict, + service: 'MIoTSpecService', + format_: str, + access: list, + unit: Optional[str] = None, + value_range: Optional[dict] = None, + value_list: Optional[list[dict]] = None, + precision: Optional[int] = None + ) -> None: + super().__init__(spec=spec) + self.service = service + self.format_ = format_ + self.access = access + self.unit = unit + self.value_range = value_range + self.value_list = value_list + self.precision = precision or 1 + + self.spec_id = hash( + f'p.{self.name}.{self.service.iid}.{self.iid}') + + @property + def format_(self) -> Type: + return self._format_ + + @format_.setter + def format_(self, value: str) -> None: + self._format_ = { + 'string': str, + 'str': str, + 'bool': bool, + 'float': float}.get( + value, int) + + @property + def access(self) -> list: + return self._access + + @access.setter + def access(self, value: list) -> None: + self._access = value + if isinstance(value, list): + self._writable = 'write' in value + self._readable = 'read' in value + self._notifiable = 'notify' in value + + @property + def writable(self) -> bool: + return self._writable + + @property + def readable(self) -> bool: + return self._readable + + @property + def notifiable(self): + return self._notifiable + + @property + def value_range(self) -> Optional[MIoTSpecValueRange]: + return self._value_range + + @value_range.setter + def value_range(self, value: Union[dict, list, None]) -> None: + """Set value-range, precision.""" + if not value: + self._value_range = None + return + self._value_range = MIoTSpecValueRange(value_range=value) + if isinstance(value, list): + self.precision = len(str(value[2]).split( + '.')[1].rstrip('0')) if '.' in str(value[2]) else 0 + + @property + def value_list(self) -> Optional[MIoTSpecValueList]: + return self._value_list + + @value_list.setter + def value_list( + self, value: Union[list[dict], MIoTSpecValueList, None] + ) -> None: + if not value: + self._value_list = None + return + if isinstance(value, list): + self._value_list = MIoTSpecValueList(value_list=value) + elif isinstance(value, MIoTSpecValueList): + self._value_list = value + + def value_format(self, value: Any) -> Any: + if value is None: + return None + if self.format_ == int: + return int(value) + if self.format_ == float: + return round(value, self.precision) + if self.format_ == bool: + return bool(value in [True, 1, 'True', 'true', '1']) + return value + + def dump(self) -> dict: + return { + 'type': self.type_, + 'name': self.name, + 'iid': self.iid, + 'description': self.description, + 'description_trans': self.description_trans, + 'proprietary': self.proprietary, + 'need_filter': self.need_filter, + 'format': self.format_.__name__, + 'access': self._access, + 'unit': self.unit, + 'value_range': ( + self._value_range.dump() if self._value_range else None), + 'value_list': self._value_list.dump() if self._value_list else None, + 'precision': self.precision + } + + +class MIoTSpecEvent(_MIoTSpecBase): + """MIoT SPEC event class.""" + argument: list[MIoTSpecProperty] + service: 'MIoTSpecService' + + def __init__( + self, spec: dict, service: 'MIoTSpecService', + argument: Optional[list[MIoTSpecProperty]] = None + ) -> None: + super().__init__(spec=spec) + self.argument = argument or [] + self.service = service + + self.spec_id = hash( + f'e.{self.name}.{self.service.iid}.{self.iid}') + + def dump(self) -> dict: + return { + 'type': self.type_, + 'name': self.name, + 'iid': self.iid, + 'description': self.description, + 'description_trans': self.description_trans, + 'proprietary': self.proprietary, + 'argument': [prop.iid for prop in self.argument], + 'need_filter': self.need_filter + } + + +class MIoTSpecAction(_MIoTSpecBase): + """MIoT SPEC action class.""" + in_: list[MIoTSpecProperty] + out: list[MIoTSpecProperty] + service: 'MIoTSpecService' + + def __init__( + self, spec: dict, service: 'MIoTSpecService', + in_: Optional[list[MIoTSpecProperty]] = None, + out: Optional[list[MIoTSpecProperty]] = None + ) -> None: + super().__init__(spec=spec) + self.in_ = in_ or [] + self.out = out or [] + self.service = service + + self.spec_id = hash( + f'a.{self.name}.{self.service.iid}.{self.iid}') + + def dump(self) -> dict: + return { + 'type': self.type_, + 'name': self.name, + 'iid': self.iid, + 'description': self.description, + 'description_trans': self.description_trans, + 'in': [prop.iid for prop in self.in_], + 'out': [prop.iid for prop in self.out], + 'proprietary': self.proprietary, + 'need_filter': self.need_filter + } + + +class MIoTSpecService(_MIoTSpecBase): + """MIoT SPEC service class.""" + properties: list[MIoTSpecProperty] + events: list[MIoTSpecEvent] + actions: list[MIoTSpecAction] + + def __init__(self, spec: dict) -> None: + super().__init__(spec=spec) + self.properties = [] + self.events = [] + self.actions = [] + + def dump(self) -> dict: + return { + 'type': self.type_, + 'name': self.name, + 'iid': self.iid, + 'description': self.description, + 'description_trans': self.description_trans, + 'proprietary': self.proprietary, + 'properties': [prop.dump() for prop in self.properties], + 'events': [event.dump() for event in self.events], + 'actions': [action.dump() for action in self.actions], + 'need_filter': self.need_filter + } + + +# @dataclass +class MIoTSpecInstance: + """MIoT SPEC instance class.""" + urn: str + name: str + # urn_name: str + description: str + description_trans: str + services: list[MIoTSpecService] + + # External params + platform: str + device_class: Any + icon: str + + def __init__( + self, urn: str, name: str, description: str, description_trans: str + ) -> None: + self.urn = urn + self.name = name + self.description = description + self.description_trans = description_trans + self.services = [] + + @staticmethod + def load(specs: dict) -> 'MIoTSpecInstance': + instance = MIoTSpecInstance( + urn=specs['urn'], + name=specs['name'], + description=specs['description'], + description_trans=specs['description_trans']) + for service in specs['services']: + spec_service = MIoTSpecService(spec=service) + for prop in service['properties']: + spec_prop = MIoTSpecProperty( + spec=prop, + service=spec_service, + format_=prop['format'], + access=prop['access'], + unit=prop['unit'], + value_range=prop['value_range'], + value_list=prop['value_list'], + precision=prop.get('precision', None)) + spec_service.properties.append(spec_prop) + for event in service['events']: + spec_event = MIoTSpecEvent( + spec=event, service=spec_service) + arg_list: list[MIoTSpecProperty] = [] + for piid in event['argument']: + for prop in spec_service.properties: + if prop.iid == piid: + arg_list.append(prop) + break + spec_event.argument = arg_list + spec_service.events.append(spec_event) + for action in service['actions']: + spec_action = MIoTSpecAction( + spec=action, service=spec_service, in_=action['in']) + in_list: list[MIoTSpecProperty] = [] + for piid in action['in']: + for prop in spec_service.properties: + if prop.iid == piid: + in_list.append(prop) + break + spec_action.in_ = in_list + out_list: list[MIoTSpecProperty] = [] + for piid in action['out']: + for prop in spec_service.properties: + if prop.iid == piid: + out_list.append(prop) + break + spec_action.out = out_list + spec_service.actions.append(spec_action) + instance.services.append(spec_service) + return instance + + def dump(self) -> dict: + return { + 'urn': self.urn, + 'name': self.name, + 'description': self.description, + 'description_trans': self.description_trans, + 'services': [service.dump() for service in self.services] + } + + +class _MIoTSpecMultiLang: + """MIoT SPEC multi lang class.""" + # pylint: disable=broad-exception-caught + _DOMAIN: str = 'miot_specs_multi_lang' + _lang: str + _storage: MIoTStorage + _main_loop: asyncio.AbstractEventLoop + + _custom_cache: dict[str, dict] + _current_data: Optional[dict[str, str]] + + def __init__( + self, lang: Optional[str], + storage: MIoTStorage, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> None: + self._lang = lang or DEFAULT_INTEGRATION_LANGUAGE + self._storage = storage + self._main_loop = loop or asyncio.get_running_loop() + + self._custom_cache = {} + self._current_data = None + + async def set_spec_async(self, urn: str) -> None: + if urn in self._custom_cache: + self._current_data = self._custom_cache[urn] + return + + trans_cache: dict[str, str] = {} + trans_cloud: dict = {} + trans_local: dict = {} + # Get multi lang from cloud + try: + trans_cloud = await self.__get_multi_lang_async(urn) + if self._lang == 'zh-Hans': + # Simplified Chinese + trans_cache = trans_cloud.get('zh_cn', {}) + elif self._lang == 'zh-Hant': + # Traditional Chinese, zh_hk or zh_tw + trans_cache = trans_cloud.get('zh_hk', {}) + if not trans_cache: + trans_cache = trans_cloud.get('zh_tw', {}) + else: + trans_cache = trans_cloud.get(self._lang, {}) + except Exception as err: + trans_cloud = {} + _LOGGER.info('get multi lang from cloud failed, %s, %s', urn, err) + # Get multi lang from local + try: + trans_local = await self._storage.load_async( + domain=self._DOMAIN, name=urn, type_=dict) # type: ignore + if ( + isinstance(trans_local, dict) + and self._lang in trans_local + ): + trans_cache.update(trans_local[self._lang]) + except Exception as err: + trans_local = {} + _LOGGER.info('get multi lang from local failed, %s, %s', urn, err) + # Default language + if not trans_cache: + if trans_cloud and DEFAULT_INTEGRATION_LANGUAGE in trans_cloud: + trans_cache = trans_cloud[DEFAULT_INTEGRATION_LANGUAGE] + if trans_local and DEFAULT_INTEGRATION_LANGUAGE in trans_local: + trans_cache.update( + trans_local[DEFAULT_INTEGRATION_LANGUAGE]) + trans_data: dict[str, str] = {} + for tag, value in trans_cache.items(): + if value is None or value.strip() == '': + continue + # The dict key is like: + # 'service:002:property:001:valuelist:000' or + # 'service:002:property:001' or 'service:002' + strs: list = tag.split(':') + strs_len = len(strs) + if strs_len == 2: + trans_data[f's:{int(strs[1])}'] = value + elif strs_len == 4: + type_ = 'p' if strs[2] == 'property' else ( + 'a' if strs[2] == 'action' else 'e') + trans_data[ + f'{type_}:{int(strs[1])}:{int(strs[3])}' + ] = value + elif strs_len == 6: + trans_data[ + f'v:{int(strs[1])}:{int(strs[3])}:{int(strs[5])}' + ] = value + + self._custom_cache[urn] = trans_data + self._current_data = trans_data + + def translate(self, key: str) -> Optional[str]: + if not self._current_data: + return None + return self._current_data.get(key, None) + + async def __get_multi_lang_async(self, urn: str) -> dict: + res_trans = await MIoTHttp.get_json_async( url='https://miot-spec.org/instance/v2/multiLanguage', params={'urn': urn}) + if ( + not isinstance(res_trans, dict) + or 'data' not in res_trans + or not isinstance(res_trans['data'], dict) + ): + raise MIoTSpecError('invalid translation data') + return res_trans['data'] + + +class MIoTSpecParser: + """MIoT SPEC parser.""" + # pylint: disable=inconsistent-quotes + VERSION: int = 1 + _DOMAIN: str = 'miot_specs' + _lang: str + _storage: MIoTStorage + _main_loop: asyncio.AbstractEventLoop + + _std_lib: _SpecStdLib + _multi_lang: _MIoTSpecMultiLang + + _init_done: bool + + _bool_trans: SpecBoolTranslation + _spec_filter: SpecFilter + + def __init__( + self, lang: Optional[str], + storage: MIoTStorage, + loop: Optional[asyncio.AbstractEventLoop] = None + ) -> None: + self._lang = lang or DEFAULT_INTEGRATION_LANGUAGE + self._storage = storage + self._main_loop = loop or asyncio.get_running_loop() + self._std_lib = _SpecStdLib(lang=self._lang) + self._multi_lang = _MIoTSpecMultiLang( + lang=self._lang, storage=self._storage, loop=self._main_loop) + + self._init_done = False + + self._bool_trans = SpecBoolTranslation( + lang=self._lang, loop=self._main_loop) + self._spec_filter = SpecFilter(loop=self._main_loop) + + async def init_async(self) -> None: + if self._init_done is True: + return + await self._bool_trans.init_async() + await self._spec_filter.init_async() + std_lib_cache = await self._storage.load_async( + domain=self._DOMAIN, name='spec_std_lib', type_=dict) + if ( + isinstance(std_lib_cache, dict) + and 'data' in std_lib_cache + and 'ts' in std_lib_cache + and isinstance(std_lib_cache['ts'], int) + and int(time.time()) - std_lib_cache['ts'] < + SPEC_STD_LIB_EFFECTIVE_TIME + ): + # Use the cache if the update time is less than 14 day + _LOGGER.debug( + 'use local spec std cache, ts->%s', std_lib_cache['ts']) + self._std_lib.load(std_lib_cache['data']) + self._init_done = True + return + # Update spec std lib + if await self._std_lib.refresh_async(): + if not await self._storage.save_async( + domain=self._DOMAIN, name='spec_std_lib', + data={ + 'data': self._std_lib.dump(), + 'ts': int(time.time()) + } + ): + _LOGGER.error('save spec std lib failed') + else: + if isinstance(std_lib_cache, dict) and 'data' in std_lib_cache: + self._std_lib.load(std_lib_cache['data']) + _LOGGER.info('get spec std lib failed, use local cache') + else: + _LOGGER.error('load spec std lib failed') + self._init_done = True + + async def deinit_async(self) -> None: + self._init_done = False + # self._std_lib.deinit() + await self._bool_trans.deinit_async() + await self._spec_filter.deinit_async() + + async def parse( + self, urn: str, skip_cache: bool = False, + ) -> Optional[MIoTSpecInstance]: + """MUST await init first !!!""" + if not skip_cache: + cache_result = await self.__cache_get(urn=urn) + if isinstance(cache_result, dict): + _LOGGER.debug('get from cache, %s', urn) + return MIoTSpecInstance.load(specs=cache_result) + # Retry three times + for index in range(3): + try: + return await self.__parse(urn=urn) + except Exception as err: # pylint: disable=broad-exception-caught + _LOGGER.error( + 'parse error, retry, %d, %s, %s', index, urn, err) + return None + + async def refresh_async(self, urn_list: list[str]) -> int: + """MUST await init first !!!""" + if not urn_list: + return False + if await self._std_lib.refresh_async(): + if not await self._storage.save_async( + domain=self._DOMAIN, name='spec_std_lib', + data={ + 'data': self._std_lib.dump(), + 'ts': int(time.time()) + } + ): + _LOGGER.error('save spec std lib failed') + else: + raise MIoTSpecError('get spec std lib failed') + success_count = 0 + for index in range(0, len(urn_list), 5): + batch = urn_list[index:index+5] + task_list = [self._main_loop.create_task( + self.parse(urn=urn, skip_cache=True)) for urn in batch] + results = await asyncio.gather(*task_list) + success_count += sum(1 for result in results if result is not None) + return success_count + + async def __cache_get(self, urn: str) -> Optional[dict]: + if platform.system() == 'Windows': + urn = urn.replace(':', '_') + return await self._storage.load_async( + domain=self._DOMAIN, + name=f'{urn}_{self._lang}', + type_=dict) # type: ignore + + async def __cache_set(self, urn: str, data: dict) -> bool: + if platform.system() == 'Windows': + urn = urn.replace(':', '_') + return await self._storage.save_async( + domain=self._DOMAIN, name=f'{urn}_{self._lang}', data=data) + + async def __get_instance(self, urn: str) -> Optional[dict]: + return await MIoTHttp.get_json_async( + url='https://miot-spec.org/miot-spec-v2/instance', + params={'type': urn}) async def __parse(self, urn: str) -> MIoTSpecInstance: _LOGGER.debug('parse urn, %s', urn) # Load spec instance - instance: dict = await self.__get_instance(urn=urn) + instance = await self.__get_instance(urn=urn) if ( not isinstance(instance, dict) or 'type' not in instance @@ -788,68 +1077,11 @@ class MIoTSpecParser: or 'services' not in instance ): raise MIoTSpecError(f'invalid urn instance, {urn}') - translation: dict = {} - try: - # Load multiple language configuration. - res_trans = await self.__get_translation(urn=urn) - if ( - not isinstance(res_trans, dict) - or 'data' not in res_trans - or not isinstance(res_trans['data'], dict) - ): - raise MIoTSpecError('invalid translation data') - urn_strs: list[str] = urn.split(':') - urn_key: str = ':'.join(urn_strs[:6]) - trans_data: dict[str, str] = None - if self._lang == 'zh-Hans': - # Simplified Chinese - trans_data = res_trans['data'].get('zh_cn', {}) - elif self._lang == 'zh-Hant': - # Traditional Chinese, zh_hk or zh_tw - trans_data = res_trans['data'].get('zh_hk', {}) - if not trans_data: - trans_data = res_trans['data'].get('zh_tw', {}) - else: - trans_data = res_trans['data'].get(self._lang, {}) - # Load local multiple language configuration. - multi_lang: dict = await self._multi_lang.translate_async( - urn_key=urn_key) - if multi_lang: - trans_data.update(multi_lang) - if not trans_data: - trans_data = res_trans['data'].get( - DEFAULT_INTEGRATION_LANGUAGE, {}) - if not trans_data: - raise MIoTSpecError( - f'the language is not supported, {self._lang}') - else: - _LOGGER.error( - 'the language is not supported, %s, try using the ' - 'default language, %s, %s', - self._lang, DEFAULT_INTEGRATION_LANGUAGE, urn) - for tag, value in trans_data.items(): - if value is None or value.strip() == '': - continue - # The dict key is like: - # 'service:002:property:001:valuelist:000' or - # 'service:002:property:001' or 'service:002' - strs: list = tag.split(':') - strs_len = len(strs) - if strs_len == 2: - translation[f's:{int(strs[1])}'] = value - elif strs_len == 4: - type_ = 'p' if strs[2] == 'property' else ( - 'a' if strs[2] == 'action' else 'e') - translation[ - f'{type_}:{int(strs[1])}:{int(strs[3])}' - ] = value - elif strs_len == 6: - translation[ - f'v:{int(strs[1])}:{int(strs[3])}:{int(strs[5])}' - ] = value - except MIoTSpecError as e: - _LOGGER.error('get translation error, %s, %s', urn, e) - # Spec filter + urn_strs: list[str] = urn.split(':') + urn_key: str = ':'.join(urn_strs[:6]) + # Set translation cache + await self._multi_lang.set_spec_async(urn=urn_key) + # Set spec filter self._spec_filter.filter_spec(urn_key=urn_key) # Parse device type spec_instance: MIoTSpecInstance = MIoTSpecInstance( @@ -880,7 +1112,7 @@ class MIoTSpecParser: if type_strs[1] != 'miot-spec-v2': spec_service.proprietary = True spec_service.description_trans = ( - translation.get(f's:{service["iid"]}', None) + self._multi_lang.translate(f's:{service["iid"]}') or self._std_lib.service_translate(key=':'.join(type_strs[:5])) or service['description'] or spec_service.name @@ -899,7 +1131,7 @@ class MIoTSpecParser: spec_prop: MIoTSpecProperty = MIoTSpecProperty( spec=property_, service=spec_service, - format_=self.__spec_format2dtype(property_['format']), + format_=property_['format'], access=property_['access'], unit=property_.get('unit', None)) spec_prop.name = p_type_strs[3] @@ -911,41 +1143,33 @@ class MIoTSpecParser: if p_type_strs[1] != 'miot-spec-v2': spec_prop.proprietary = spec_service.proprietary or True spec_prop.description_trans = ( - translation.get( - f'p:{service["iid"]}:{property_["iid"]}', None) + self._multi_lang.translate( + f'p:{service["iid"]}:{property_["iid"]}') or self._std_lib.property_translate( key=':'.join(p_type_strs[:5])) or property_['description'] or spec_prop.name) if 'value-range' in property_: - spec_prop.value_range = { - 'min': property_['value-range'][0], - 'max': property_['value-range'][1], - 'step': property_['value-range'][2] - } - spec_prop.precision = len(str( - property_['value-range'][2]).split( - '.')[1].rstrip('0')) if '.' in str( - property_['value-range'][2]) else 0 + spec_prop.value_range = property_['value-range'] elif 'value-list' in property_: v_list: list[dict] = property_['value-list'] for index, v in enumerate(v_list): v['name'] = v['description'] v['description'] = ( - translation.get( + self._multi_lang.translate( f'v:{service["iid"]}:{property_["iid"]}:' - f'{index}', None) + f'{index}') or self._std_lib.value_translate( key=f'{type_strs[:5]}|{p_type_strs[3]}|' f'{v["description"]}') - or v['name'] - ) - spec_prop.value_list = v_list + or v['name']) + spec_prop.value_list = MIoTSpecValueList.from_spec(v_list) elif property_['format'] == 'bool': v_tag = ':'.join(p_type_strs[:5]) - v_descriptions: dict = ( + v_descriptions = ( await self._bool_trans.translate_async(urn=v_tag)) if v_descriptions: + # bool without value-list.name spec_prop.value_list = v_descriptions spec_service.properties.append(spec_prop) # Parse service event @@ -969,8 +1193,8 @@ class MIoTSpecParser: if e_type_strs[1] != 'miot-spec-v2': spec_event.proprietary = spec_service.proprietary or True spec_event.description_trans = ( - translation.get( - f'e:{service["iid"]}:{event["iid"]}', None) + self._multi_lang.translate( + f'e:{service["iid"]}:{event["iid"]}') or self._std_lib.event_translate( key=':'.join(e_type_strs[:5])) or event['description'] @@ -1005,8 +1229,8 @@ class MIoTSpecParser: if a_type_strs[1] != 'miot-spec-v2': spec_action.proprietary = spec_service.proprietary or True spec_action.description_trans = ( - translation.get( - f'a:{service["iid"]}:{action["iid"]}', None) + self._multi_lang.translate( + f'a:{service["iid"]}:{action["iid"]}') or self._std_lib.action_translate( key=':'.join(a_type_strs[:5])) or action['description'] diff --git a/custom_components/xiaomi_home/miot/miot_storage.py b/custom_components/xiaomi_home/miot/miot_storage.py index 85b25c9..de02199 100644 --- a/custom_components/xiaomi_home/miot/miot_storage.py +++ b/custom_components/xiaomi_home/miot/miot_storage.py @@ -719,60 +719,6 @@ class MIoTCert: return binascii.hexlify(sha1_hash.finalize()).decode('utf-8') -class SpecMultiLang: - """ - MIoT-Spec-V2 multi-language for entities. - """ - MULTI_LANG_FILE = 'specs/multi_lang.json' - _main_loop: asyncio.AbstractEventLoop - _lang: str - _data: Optional[dict[str, dict]] - - def __init__( - self, lang: str, loop: Optional[asyncio.AbstractEventLoop] = None - ) -> None: - self._main_loop = loop or asyncio.get_event_loop() - self._lang = lang - self._data = None - - async def init_async(self) -> None: - if isinstance(self._data, dict): - return - multi_lang_data = None - self._data = {} - try: - multi_lang_data = await self._main_loop.run_in_executor( - None, load_json_file, - os.path.join( - os.path.dirname(os.path.abspath(__file__)), - self.MULTI_LANG_FILE)) - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.error('multi lang, load file error, %s', err) - return - # Check if the file is a valid JSON file - if not isinstance(multi_lang_data, dict): - _LOGGER.error('multi lang, invalid file data') - return - for lang_data in multi_lang_data.values(): - if not isinstance(lang_data, dict): - _LOGGER.error('multi lang, invalid lang data') - return - for data in lang_data.values(): - if not isinstance(data, dict): - _LOGGER.error('multi lang, invalid lang data item') - return - self._data = multi_lang_data - - async def deinit_async(self) -> str: - self._data = None - - async def translate_async(self, urn_key: str) -> dict[str, str]: - """MUST call init_async() first.""" - if urn_key in self._data: - return self._data[urn_key].get(self._lang, {}) - return {} - - class SpecBoolTranslation: """ Boolean value translation. diff --git a/custom_components/xiaomi_home/notify.py b/custom_components/xiaomi_home/notify.py index ba0844a..5cf3fd8 100644 --- a/custom_components/xiaomi_home/notify.py +++ b/custom_components/xiaomi_home/notify.py @@ -90,7 +90,7 @@ class Notify(MIoTActionEntity, NotifyEntity): super().__init__(miot_device=miot_device, spec=spec) self._attr_extra_state_attributes = {} action_in: str = ', '.join([ - f'{prop.description_trans}({prop.format_})' + f'{prop.description_trans}({prop.format_.__name__})' for prop in self.spec.in_]) self._attr_extra_state_attributes['action params'] = f'[{action_in}]' @@ -122,24 +122,24 @@ class Notify(MIoTActionEntity, NotifyEntity): return in_value: list[dict] = [] for index, prop in enumerate(self.spec.in_): - if prop.format_ == 'str': + if prop.format_ == str: if isinstance(in_list[index], (bool, int, float, str)): in_value.append( {'piid': prop.iid, 'value': str(in_list[index])}) continue - elif prop.format_ == 'bool': + elif prop.format_ == bool: if isinstance(in_list[index], (bool, int)): # yes, no, on, off, true, false and other bool types # will also be parsed as 0 and 1 of int. in_value.append( {'piid': prop.iid, 'value': bool(in_list[index])}) continue - elif prop.format_ == 'float': + elif prop.format_ == float: if isinstance(in_list[index], (int, float)): in_value.append( {'piid': prop.iid, 'value': in_list[index]}) continue - elif prop.format_ == 'int': + elif prop.format_ == int: if isinstance(in_list[index], int): in_value.append( {'piid': prop.iid, 'value': in_list[index]}) diff --git a/custom_components/xiaomi_home/number.py b/custom_components/xiaomi_home/number.py index 53bc09c..29bd6b7 100644 --- a/custom_components/xiaomi_home/number.py +++ b/custom_components/xiaomi_home/number.py @@ -92,9 +92,9 @@ class Number(MIoTPropertyEntity, NumberEntity): self._attr_icon = self.spec.icon # Set value range if self._value_range: - self._attr_native_min_value = self._value_range['min'] - self._attr_native_max_value = self._value_range['max'] - self._attr_native_step = self._value_range['step'] + self._attr_native_min_value = self._value_range.min_ + self._attr_native_max_value = self._value_range.max_ + self._attr_native_step = self._value_range.step @property def native_value(self) -> Optional[float]: diff --git a/custom_components/xiaomi_home/select.py b/custom_components/xiaomi_home/select.py index 4c9bad3..21b5e78 100644 --- a/custom_components/xiaomi_home/select.py +++ b/custom_components/xiaomi_home/select.py @@ -82,7 +82,8 @@ class Select(MIoTPropertyEntity, SelectEntity): def __init__(self, miot_device: MIoTDevice, spec: MIoTSpecProperty) -> None: """Initialize the Select.""" super().__init__(miot_device=miot_device, spec=spec) - self._attr_options = list(self._value_list.values()) + if self._value_list: + self._attr_options = self._value_list.descriptions async def async_select_option(self, option: str) -> None: """Change the selected option.""" diff --git a/custom_components/xiaomi_home/sensor.py b/custom_components/xiaomi_home/sensor.py index 39b3bdb..9d2bc5c 100644 --- a/custom_components/xiaomi_home/sensor.py +++ b/custom_components/xiaomi_home/sensor.py @@ -91,7 +91,7 @@ class Sensor(MIoTPropertyEntity, SensorEntity): self._attr_device_class = SensorDeviceClass.ENUM self._attr_icon = 'mdi:message-text' self._attr_native_unit_of_measurement = None - self._attr_options = list(self._value_list.values()) + self._attr_options = self._value_list.descriptions else: self._attr_device_class = spec.device_class if spec.external_unit: @@ -115,14 +115,14 @@ class Sensor(MIoTPropertyEntity, SensorEntity): """Return the current value of the sensor.""" if self._value_range and isinstance(self._value, (int, float)): if ( - self._value < self._value_range['min'] - or self._value > self._value_range['max'] + self._value < self._value_range.min_ + or self._value > self._value_range.max_ ): _LOGGER.info( '%s, data exception, out of range, %s, %s', self.entity_id, self._value, self._value_range) if self._value_list: - return self._value_list.get(self._value, None) + return self.get_vlist_description(self._value) if isinstance(self._value, str): return self._value[:255] return self._value diff --git a/custom_components/xiaomi_home/text.py b/custom_components/xiaomi_home/text.py index 8a6b9ae..cb57f2c 100644 --- a/custom_components/xiaomi_home/text.py +++ b/custom_components/xiaomi_home/text.py @@ -111,7 +111,7 @@ class ActionText(MIoTActionEntity, TextEntity): self._attr_extra_state_attributes = {} self._attr_native_value = '' action_in: str = ', '.join([ - f'{prop.description_trans}({prop.format_})' + f'{prop.description_trans}({prop.format_.__name__})' for prop in self.spec.in_]) self._attr_extra_state_attributes['action params'] = f'[{action_in}]' # For action debug @@ -141,24 +141,24 @@ class ActionText(MIoTActionEntity, TextEntity): f'invalid action params, {value}') in_value: list[dict] = [] for index, prop in enumerate(self.spec.in_): - if prop.format_ == 'str': + if prop.format_ == str: if isinstance(in_list[index], (bool, int, float, str)): in_value.append( {'piid': prop.iid, 'value': str(in_list[index])}) continue - elif prop.format_ == 'bool': + elif prop.format_ == bool: if isinstance(in_list[index], (bool, int)): # yes, no, on, off, true, false and other bool types # will also be parsed as 0 and 1 of int. in_value.append( {'piid': prop.iid, 'value': bool(in_list[index])}) continue - elif prop.format_ == 'float': + elif prop.format_ == float: if isinstance(in_list[index], (int, float)): in_value.append( {'piid': prop.iid, 'value': in_list[index]}) continue - elif prop.format_ == 'int': + elif prop.format_ == int: if isinstance(in_list[index], int): in_value.append( {'piid': prop.iid, 'value': in_list[index]}) diff --git a/custom_components/xiaomi_home/vacuum.py b/custom_components/xiaomi_home/vacuum.py index fda2d5a..232e676 100644 --- a/custom_components/xiaomi_home/vacuum.py +++ b/custom_components/xiaomi_home/vacuum.py @@ -120,28 +120,18 @@ class Vacuum(MIoTServiceEntity, StateVacuumEntity): # properties for prop in entity_data.props: if prop.name == 'status': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'invalid status value_list, %s', self.entity_id) continue - self._status_map = { - item['value']: item['description'] - for item in prop.value_list} + self._status_map = prop.value_list.to_map() self._prop_status = prop elif prop.name == 'fan-level': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'invalid fan-level value_list, %s', self.entity_id) continue - self._fan_level_map = { - item['value']: item['description'] - for item in prop.value_list} + self._fan_level_map = prop.value_list.to_map() self._attr_fan_speed_list = list(self._fan_level_map.values()) self._attr_supported_features |= VacuumEntityFeature.FAN_SPEED self._prop_fan_level = prop @@ -202,7 +192,7 @@ class Vacuum(MIoTServiceEntity, StateVacuumEntity): @property def state(self) -> Optional[str]: """Return the current state of the vacuum cleaner.""" - return self.get_map_description( + return self.get_map_value( map_=self._status_map, key=self.get_prop_value(prop=self._prop_status)) @@ -214,6 +204,6 @@ class Vacuum(MIoTServiceEntity, StateVacuumEntity): @property def fan_speed(self) -> Optional[str]: """Return the current fan speed of the vacuum cleaner.""" - return self.get_map_description( + return self.get_map_value( map_=self._fan_level_map, key=self.get_prop_value(prop=self._prop_fan_level)) diff --git a/custom_components/xiaomi_home/water_heater.py b/custom_components/xiaomi_home/water_heater.py index aa7fe67..aba6093 100644 --- a/custom_components/xiaomi_home/water_heater.py +++ b/custom_components/xiaomi_home/water_heater.py @@ -93,7 +93,7 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): _prop_target_temp: Optional[MIoTSpecProperty] _prop_mode: Optional[MIoTSpecProperty] - _mode_list: Optional[dict[Any, Any]] + _mode_map: Optional[dict[Any, Any]] def __init__( self, miot_device: MIoTDevice, entity_data: MIoTEntityData @@ -106,7 +106,7 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): self._prop_temp = None self._prop_target_temp = None self._prop_mode = None - self._mode_list = None + self._mode_map = None # properties for prop in entity_data.props: @@ -115,7 +115,7 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): self._prop_on = prop # temperature if prop.name == 'temperature': - if isinstance(prop.value_range, dict): + if prop.value_range: if ( self._attr_temperature_unit is None and prop.external_unit @@ -128,9 +128,14 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): self.entity_id) # target-temperature if prop.name == 'target-temperature': - self._attr_min_temp = prop.value_range['min'] - self._attr_max_temp = prop.value_range['max'] - self._attr_precision = prop.value_range['step'] + if not prop.value_range: + _LOGGER.error( + 'invalid target-temperature value_range format, %s', + self.entity_id) + continue + self._attr_min_temp = prop.value_range.min_ + self._attr_max_temp = prop.value_range.max_ + self._attr_precision = prop.value_range.step if self._attr_temperature_unit is None and prop.external_unit: self._attr_temperature_unit = prop.external_unit self._attr_supported_features |= ( @@ -138,17 +143,12 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): self._prop_target_temp = prop # mode if prop.name == 'mode': - if ( - not isinstance(prop.value_list, list) - or not prop.value_list - ): + if not prop.value_list: _LOGGER.error( 'mode value_list is None, %s', self.entity_id) continue - self._mode_list = { - item['value']: item['description'] - for item in prop.value_list} - self._attr_operation_list = list(self._mode_list.values()) + self._mode_map = prop.value_list.to_map() + self._attr_operation_list = list(self._mode_map.values()) self._attr_supported_features |= ( WaterHeaterEntityFeature.OPERATION_MODE) self._prop_mode = prop @@ -184,7 +184,9 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): prop=self._prop_on, value=True, update=False) await self.set_property_async( prop=self._prop_mode, - value=self.__get_mode_value(description=operation_mode)) + value=self.get_map_key( + map_=self._mode_map, + value=operation_mode)) async def async_turn_away_mode_on(self) -> None: """Set the water heater to away mode.""" @@ -207,20 +209,6 @@ class WaterHeater(MIoTServiceEntity, WaterHeaterEntity): return STATE_OFF if not self._prop_mode and self.get_prop_value(prop=self._prop_on): return STATE_ON - return self.__get_mode_description( + return self.get_map_value( + map_=self._mode_map, key=self.get_prop_value(prop=self._prop_mode)) - - def __get_mode_description(self, key: int) -> Optional[str]: - """Convert mode value to description.""" - if self._mode_list is None: - return None - return self._mode_list.get(key, None) - - def __get_mode_value(self, description: str) -> Optional[int]: - """Convert mode description to value.""" - if self._mode_list is None: - return None - for key, value in self._mode_list.items(): - if value == description: - return key - return None