fomatted code

This commit is contained in:
GavinIves 2025-06-04 08:52:48 +00:00
parent b72ec85ae9
commit 83899f8084
7 changed files with 1538 additions and 1497 deletions

View File

@ -77,14 +77,14 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up a config entry."""
device_list: list[MIoTDevice] = hass.data[DOMAIN]["devices"][config_entry.entry_id]
device_list: list[MIoTDevice] = hass.data[DOMAIN]["devices"][
config_entry.entry_id]
new_entities = []
for miot_device in device_list:
for data in miot_device.entity_list.get("light", []):
new_entities.append(
Light(miot_device=miot_device, entity_data=data, hass=hass)
)
Light(miot_device=miot_device, entity_data=data, hass=hass))
if new_entities:
async_add_entities(new_entities)
@ -104,9 +104,8 @@ class Light(MIoTServiceEntity, LightEntity):
_brightness_scale: Optional[tuple[int, int]]
_mode_map: Optional[dict[Any, Any]]
def __init__(
self, miot_device: MIoTDevice, entity_data: MIoTEntityData, hass: HomeAssistant
) -> None:
def __init__(self, miot_device: MIoTDevice, entity_data: MIoTEntityData,
hass: HomeAssistant) -> None:
"""Initialize the Light."""
super().__init__(miot_device=miot_device, entity_data=entity_data)
self.hass = hass
@ -145,7 +144,8 @@ class Light(MIoTServiceEntity, LightEntity):
self._attr_supported_features |= LightEntityFeature.EFFECT
self._prop_mode = prop
else:
_LOGGER.info("invalid brightness format, %s", self.entity_id)
_LOGGER.info("invalid brightness format, %s",
self.entity_id)
continue
# color-temperature
if prop.name == "color-temperature":
@ -172,13 +172,9 @@ class Light(MIoTServiceEntity, LightEntity):
mode_list = prop.value_list.to_map()
elif prop.value_range:
mode_list = {}
if (
int(
(prop.value_range.max_ - prop.value_range.min_)
/ prop.value_range.step
)
> self._VALUE_RANGE_MODE_COUNT_MAX
):
if (int((prop.value_range.max_ - prop.value_range.min_) /
prop.value_range.step)
> self._VALUE_RANGE_MODE_COUNT_MAX):
_LOGGER.error(
"too many mode values, %s, %s, %s",
self.entity_id,
@ -187,9 +183,9 @@ class Light(MIoTServiceEntity, LightEntity):
)
else:
for value in range(
prop.value_range.min_,
prop.value_range.max_,
prop.value_range.step,
prop.value_range.min_,
prop.value_range.max_,
prop.value_range.step,
):
mode_list[value] = f"mode {value}"
if mode_list:
@ -245,9 +241,8 @@ class Light(MIoTServiceEntity, LightEntity):
@property
def effect(self) -> Optional[str]:
"""Return the current mode."""
return self.get_map_value(
map_=self._mode_map, key=self.get_prop_value(prop=self._prop_mode)
)
return self.get_map_value(map_=self._mode_map,
key=self.get_prop_value(prop=self._prop_mode))
async def async_turn_on(self, **kwargs) -> None:
"""Turn the light on.
@ -263,23 +258,24 @@ class Light(MIoTServiceEntity, LightEntity):
set_properties_list: List[Dict[str, Any]] = []
if self._prop_on:
value_on = True if self._prop_on.format_ == bool else 1 # noqa: E721
set_properties_list.append({"prop": self._prop_on, "value": value_on})
set_properties_list.append({
"prop": self._prop_on,
"value": value_on
})
# brightness
if ATTR_BRIGHTNESS in kwargs:
brightness = brightness_to_value(
self._brightness_scale, kwargs[ATTR_BRIGHTNESS]
)
set_properties_list.append(
{"prop": self._prop_brightness, "value": brightness}
)
brightness = brightness_to_value(self._brightness_scale,
kwargs[ATTR_BRIGHTNESS])
set_properties_list.append({
"prop": self._prop_brightness,
"value": brightness
})
# color-temperature
if ATTR_COLOR_TEMP_KELVIN in kwargs:
set_properties_list.append(
{
"prop": self._prop_color_temp,
"value": kwargs[ATTR_COLOR_TEMP_KELVIN],
}
)
set_properties_list.append({
"prop": self._prop_color_temp,
"value": kwargs[ATTR_COLOR_TEMP_KELVIN],
})
self._attr_color_mode = ColorMode.COLOR_TEMP
# rgb color
if ATTR_RGB_COLOR in kwargs:
@ -287,42 +283,46 @@ class Light(MIoTServiceEntity, LightEntity):
g = kwargs[ATTR_RGB_COLOR][1]
b = kwargs[ATTR_RGB_COLOR][2]
rgb = (r << 16) | (g << 8) | b
set_properties_list.append({"prop": self._prop_color, "value": rgb})
set_properties_list.append({
"prop": self._prop_color,
"value": rgb
})
self._attr_color_mode = ColorMode.RGB
# mode
if ATTR_EFFECT in kwargs:
set_properties_list.append(
{
"prop": self._prop_mode,
"value": self.get_map_key(
map_=self._mode_map, value=kwargs[ATTR_EFFECT]
),
}
)
set_properties_list.append({
"prop":
self._prop_mode,
"value":
self.get_map_key(map_=self._mode_map,
value=kwargs[ATTR_EFFECT]),
})
await self.set_properties_async(set_properties_list)
self.async_write_ha_state()
elif command_send_mode and command_send_mode.state == "Send Turn On First":
set_properties_list: List[Dict[str, Any]] = []
if self._prop_on:
value_on = True if self._prop_on.format_ == bool else 1 # noqa: E721
set_properties_list.append({"prop": self._prop_on, "value": value_on})
await self.set_property_async(prop=self._prop_on, value=value_on)
set_properties_list.append({
"prop": self._prop_on,
"value": value_on
})
await self.set_property_async(prop=self._prop_on,
value=value_on)
# brightness
if ATTR_BRIGHTNESS in kwargs:
brightness = brightness_to_value(
self._brightness_scale, kwargs[ATTR_BRIGHTNESS]
)
set_properties_list.append(
{"prop": self._prop_brightness, "value": brightness}
)
brightness = brightness_to_value(self._brightness_scale,
kwargs[ATTR_BRIGHTNESS])
set_properties_list.append({
"prop": self._prop_brightness,
"value": brightness
})
# color-temperature
if ATTR_COLOR_TEMP_KELVIN in kwargs:
set_properties_list.append(
{
"prop": self._prop_color_temp,
"value": kwargs[ATTR_COLOR_TEMP_KELVIN],
}
)
set_properties_list.append({
"prop": self._prop_color_temp,
"value": kwargs[ATTR_COLOR_TEMP_KELVIN],
})
self._attr_color_mode = ColorMode.COLOR_TEMP
# rgb color
if ATTR_RGB_COLOR in kwargs:
@ -330,33 +330,35 @@ class Light(MIoTServiceEntity, LightEntity):
g = kwargs[ATTR_RGB_COLOR][1]
b = kwargs[ATTR_RGB_COLOR][2]
rgb = (r << 16) | (g << 8) | b
set_properties_list.append({"prop": self._prop_color, "value": rgb})
set_properties_list.append({
"prop": self._prop_color,
"value": rgb
})
self._attr_color_mode = ColorMode.RGB
# mode
if ATTR_EFFECT in kwargs:
set_properties_list.append(
{
"prop": self._prop_mode,
"value": self.get_map_key(
map_=self._mode_map, value=kwargs[ATTR_EFFECT]
),
}
)
set_properties_list.append({
"prop":
self._prop_mode,
"value":
self.get_map_key(map_=self._mode_map,
value=kwargs[ATTR_EFFECT]),
})
await self.set_properties_async(set_properties_list)
self.async_write_ha_state()
else:
if self._prop_on:
value_on = True if self._prop_on.format_ == bool else 1 # noqa: E721
await self.set_property_async(prop=self._prop_on, value=value_on)
await self.set_property_async(prop=self._prop_on,
value=value_on)
# brightness
if ATTR_BRIGHTNESS in kwargs:
brightness = brightness_to_value(
self._brightness_scale, kwargs[ATTR_BRIGHTNESS]
)
await self.set_property_async(
prop=self._prop_brightness, value=brightness, write_ha_state=False
)
brightness = brightness_to_value(self._brightness_scale,
kwargs[ATTR_BRIGHTNESS])
await self.set_property_async(prop=self._prop_brightness,
value=brightness,
write_ha_state=False)
# color-temperature
if ATTR_COLOR_TEMP_KELVIN in kwargs:
await self.set_property_async(
@ -371,17 +373,16 @@ class Light(MIoTServiceEntity, LightEntity):
g = kwargs[ATTR_RGB_COLOR][1]
b = kwargs[ATTR_RGB_COLOR][2]
rgb = (r << 16) | (g << 8) | b
await self.set_property_async(
prop=self._prop_color, value=rgb, write_ha_state=False
)
await self.set_property_async(prop=self._prop_color,
value=rgb,
write_ha_state=False)
self._attr_color_mode = ColorMode.RGB
# mode
if ATTR_EFFECT in kwargs:
await self.set_property_async(
prop=self._prop_mode,
value=self.get_map_key(
map_=self._mode_map, value=kwargs[ATTR_EFFECT]
),
value=self.get_map_key(map_=self._mode_map,
value=kwargs[ATTR_EFFECT]),
write_ha_state=False,
)
self.async_write_ha_state()

File diff suppressed because it is too large Load Diff

View File

@ -103,7 +103,8 @@ class MIoTOauthClient:
else:
self._oauth_host = f"{cloud_server}.{DEFAULT_OAUTH2_API_HOST}"
self._device_id = f"ha.{uuid}"
self._state = hashlib.sha1(f"d={self._device_id}".encode("utf-8")).hexdigest()
self._state = hashlib.sha1(
f"d={self._device_id}".encode("utf-8")).hexdigest()
self._session = aiohttp.ClientSession(loop=self._main_loop)
@property
@ -166,31 +167,24 @@ class MIoTOauthClient:
timeout=MIHOME_HTTP_API_TIMEOUT,
)
if http_res.status == 401:
raise MIoTOauthError(
"unauthorized(401)", MIoTErrorCode.CODE_OAUTH_UNAUTHORIZED
)
raise MIoTOauthError("unauthorized(401)",
MIoTErrorCode.CODE_OAUTH_UNAUTHORIZED)
if http_res.status != 200:
raise MIoTOauthError(f"invalid http status code, {http_res.status}")
res_str = await http_res.text()
res_obj = json.loads(res_str)
if (
not res_obj
or res_obj.get("code", None) != 0
or "result" not in res_obj
or not all(
key in res_obj["result"]
for key in ["access_token", "refresh_token", "expires_in"]
)
):
if (not res_obj or res_obj.get("code", None) != 0 or
"result" not in res_obj or
not all(key in res_obj["result"] for key in
["access_token", "refresh_token", "expires_in"])):
raise MIoTOauthError(f"invalid http response, {res_str}")
return {
**res_obj["result"],
"expires_ts": int(
time.time()
+ (res_obj["result"].get("expires_in", 0) * TOKEN_EXPIRES_TS_RATIO)
),
"expires_ts":
int(time.time() + (res_obj["result"].get("expires_in", 0) *
TOKEN_EXPIRES_TS_RATIO)),
}
async def get_access_token_async(self, code: str) -> dict:
@ -211,8 +205,7 @@ class MIoTOauthClient:
"redirect_uri": self._redirect_url,
"code": code,
"device_id": self._device_id,
}
)
})
async def refresh_access_token_async(self, refresh_token: str) -> dict:
"""get access token by refresh token.
@ -231,8 +224,7 @@ class MIoTOauthClient:
"client_id": self._client_id,
"redirect_uri": self._redirect_url,
"refresh_token": refresh_token,
}
)
})
class MIoTHttpClient:
@ -267,16 +259,14 @@ class MIoTHttpClient:
self._get_prop_timer = None
self._get_prop_list = {}
if (
not isinstance(cloud_server, str)
or not isinstance(client_id, str)
or not isinstance(access_token, str)
):
if (not isinstance(cloud_server, str) or
not isinstance(client_id, str) or
not isinstance(access_token, str)):
raise MIoTHttpError("invalid params")
self.update_http_header(
cloud_server=cloud_server, client_id=client_id, access_token=access_token
)
self.update_http_header(cloud_server=cloud_server,
client_id=client_id,
access_token=access_token)
self._session = aiohttp.ClientSession(loop=self._main_loop)
@ -319,8 +309,10 @@ class MIoTHttpClient:
# pylint: disable=unused-private-member
async def __mihome_api_get_async(
self, url_path: str, params: dict, timeout: int = MIHOME_HTTP_API_TIMEOUT
) -> dict:
self,
url_path: str,
params: dict,
timeout: int = MIHOME_HTTP_API_TIMEOUT) -> dict:
http_res = await self._session.get(
url=f"{self._base_url}{url_path}",
params=params,
@ -341,16 +333,16 @@ class MIoTHttpClient:
if res_obj.get("code", None) != 0:
raise MIoTHttpError(
f"invalid response code, {res_obj.get('code', None)}, "
f"{res_obj.get('message', '')}"
)
_LOGGER.debug(
"mihome api get, %s%s, %s -> %s", self._base_url, url_path, params, res_obj
)
f"{res_obj.get('message', '')}")
_LOGGER.debug("mihome api get, %s%s, %s -> %s", self._base_url,
url_path, params, res_obj)
return res_obj
async def __mihome_api_post_async(
self, url_path: str, data: dict, timeout: int = MIHOME_HTTP_API_TIMEOUT
) -> dict:
self,
url_path: str,
data: dict,
timeout: int = MIHOME_HTTP_API_TIMEOUT) -> dict:
http_res = await self._session.post(
url=f"{self._base_url}{url_path}",
json=data,
@ -371,29 +363,26 @@ class MIoTHttpClient:
if res_obj.get("code", None) != 0:
raise MIoTHttpError(
f"invalid response code, {res_obj.get('code', None)}, "
f"{res_obj.get('message', '')}"
)
_LOGGER.debug(
"mihome api post, %s%s, %s -> %s", self._base_url, url_path, data, res_obj
)
f"{res_obj.get('message', '')}")
_LOGGER.debug("mihome api post, %s%s, %s -> %s", self._base_url,
url_path, data, res_obj)
return res_obj
async def get_user_info_async(self) -> dict:
http_res = await self._session.get(
url="https://open.account.xiaomi.com/user/profile",
params={"clientId": self._client_id, "token": self._access_token},
params={
"clientId": self._client_id,
"token": self._access_token
},
headers={"content-type": "application/x-www-form-urlencoded"},
timeout=MIHOME_HTTP_API_TIMEOUT,
)
res_str = await http_res.text()
res_obj = json.loads(res_str)
if (
not res_obj
or res_obj.get("code", None) != 0
or "data" not in res_obj
or "miliaoNick" not in res_obj["data"]
):
if (not res_obj or res_obj.get("code", None) != 0 or
"data" not in res_obj or "miliaoNick" not in res_obj["data"]):
raise MIoTOauthError(f"invalid http response, {http_res.text}")
return res_obj["data"]
@ -414,7 +403,8 @@ class MIoTHttpClient:
return cert
async def __get_dev_room_page_async(self, max_id: Optional[str] = None) -> dict:
async def __get_dev_room_page_async(self,
max_id: Optional[str] = None) -> dict:
res_obj = await self.__mihome_api_post_async(
url_path="/app/v2/homeroom/get_dev_room_page",
data={
@ -435,39 +425,34 @@ class MIoTHttpClient:
}
for room in home.get("roomlist", []):
if "id" not in room:
_LOGGER.error("get dev room page error, invalid room, %s", room)
_LOGGER.error("get dev room page error, invalid room, %s",
room)
continue
home_list[str(home["id"])]["room_info"][str(room["id"])] = {
"dids": room.get("dids", None) or []
}
if res_obj["result"].get("has_more", False) and isinstance(
res_obj["result"].get("max_id", None), str
):
res_obj["result"].get("max_id", None), str):
next_list = await self.__get_dev_room_page_async(
max_id=res_obj["result"]["max_id"]
)
max_id=res_obj["result"]["max_id"])
for home_id, info in next_list.items():
home_list.setdefault(home_id, {"dids": [], "room_info": {}})
home_list[home_id]["dids"].extend(info["dids"])
for room_id, info in info["room_info"].items():
home_list[home_id]["room_info"].setdefault(room_id, {"dids": []})
home_list[home_id]["room_info"].setdefault(
room_id, {"dids": []})
home_list[home_id]["room_info"][room_id]["dids"].extend(
info["dids"]
)
info["dids"])
return home_list
async def get_separated_shared_devices_async(self) -> dict[str, dict]:
separated_shared_devices: dict = {}
device_list: dict[str, dict] = await self.__get_device_list_page_async(
dids=[], start_did=None
)
dids=[], start_did=None)
for did, value in device_list.items():
if (
value["owner"] is not None
and ("userid" in value["owner"])
and ("nickname" in value["owner"])
):
if (value["owner"] is not None and ("userid" in value["owner"]) and
("nickname" in value["owner"])):
separated_shared_devices.setdefault(did, value["owner"])
return separated_shared_devices
@ -495,45 +480,53 @@ class MIoTHttpClient:
if uid is None and device_source == "homelist":
uid = str(home["uid"])
home_infos[device_source][home["id"]] = {
"home_id": home["id"],
"home_name": home["name"],
"city_id": home.get("city_id", None),
"longitude": home.get("longitude", None),
"latitude": home.get("latitude", None),
"address": home.get("address", None),
"dids": home.get("dids", []),
"home_id":
home["id"],
"home_name":
home["name"],
"city_id":
home.get("city_id", None),
"longitude":
home.get("longitude", None),
"latitude":
home.get("latitude", None),
"address":
home.get("address", None),
"dids":
home.get("dids", []),
"room_info": {
room["id"]: {
"room_id": room["id"],
"room_name": room["name"],
"dids": room.get("dids", []),
}
for room in home.get("roomlist", [])
if "id" in room
} for room in home.get("roomlist", []) if "id" in room
},
"group_id": calc_group_id(uid=home["uid"], home_id=home["id"]),
"uid": str(home["uid"]),
"group_id":
calc_group_id(uid=home["uid"], home_id=home["id"]),
"uid":
str(home["uid"]),
}
home_infos["uid"] = uid
if res_obj["result"].get("has_more", False) and isinstance(
res_obj["result"].get("max_id", None), str
):
res_obj["result"].get("max_id", None), str):
more_list = await self.__get_dev_room_page_async(
max_id=res_obj["result"]["max_id"]
)
max_id=res_obj["result"]["max_id"])
for device_source in ["homelist", "share_home_list"]:
for home_id, info in more_list.items():
if home_id not in home_infos[device_source]:
_LOGGER.info("unknown home, %s, %s", home_id, info)
continue
home_infos[device_source][home_id]["dids"].extend(info["dids"])
home_infos[device_source][home_id]["dids"].extend(
info["dids"])
for room_id, info in info["room_info"].items():
home_infos[device_source][home_id]["room_info"].setdefault(
room_id, {"room_id": room_id, "room_name": "", "dids": []}
)
home_infos[device_source][home_id]["room_info"][room_id][
"dids"
].extend(info["dids"])
home_infos[device_source][home_id][
"room_info"].setdefault(room_id, {
"room_id": room_id,
"room_name": "",
"dids": []
})
home_infos[device_source][home_id]["room_info"][
room_id]["dids"].extend(info["dids"])
return {
"uid": uid,
@ -545,15 +538,15 @@ class MIoTHttpClient:
return (await self.get_homeinfos_async()).get("uid", None)
async def __get_device_list_page_async(
self, dids: list[str], start_did: Optional[str] = None
) -> dict[str, dict]:
self,
dids: list[str],
start_did: Optional[str] = None) -> dict[str, dict]:
req_data: dict = {"limit": 200, "get_split_device": True, "dids": dids}
if start_did:
req_data["start_did"] = start_did
device_infos: dict = {}
res_obj = await self.__mihome_api_post_async(
url_path="/app/v2/home/device_list_page", data=req_data
)
url_path="/app/v2/home/device_list_page", data=req_data)
if "result" not in res_obj:
raise MIoTHttpError("invalid response result")
res_obj = res_obj["result"]
@ -575,56 +568,69 @@ class MIoTHttpClient:
_LOGGER.info("ignore miwifi.* device, cloud, %s", did)
continue
device_infos[did] = {
"did": did,
"uid": device.get("uid", None),
"name": name,
"urn": urn,
"model": model,
"connect_type": device.get("pid", -1),
"token": device.get("token", None),
"online": device.get("isOnline", False),
"icon": device.get("icon", None),
"parent_id": device.get("parent_id", None),
"manufacturer": model.split(".")[0],
"did":
did,
"uid":
device.get("uid", None),
"name":
name,
"urn":
urn,
"model":
model,
"connect_type":
device.get("pid", -1),
"token":
device.get("token", None),
"online":
device.get("isOnline", False),
"icon":
device.get("icon", None),
"parent_id":
device.get("parent_id", None),
"manufacturer":
model.split(".")[0],
# 2: xiao-ai, 1: general speaker
"voice_ctrl": device.get("voice_ctrl", 0),
"rssi": device.get("rssi", None),
"owner": device.get("owner", None),
"pid": device.get("pid", None),
"local_ip": device.get("local_ip", None),
"ssid": device.get("ssid", None),
"bssid": device.get("bssid", None),
"order_time": device.get("orderTime", 0),
"fw_version": device.get("extra", {}).get("fw_version", "unknown"),
"voice_ctrl":
device.get("voice_ctrl", 0),
"rssi":
device.get("rssi", None),
"owner":
device.get("owner", None),
"pid":
device.get("pid", None),
"local_ip":
device.get("local_ip", None),
"ssid":
device.get("ssid", None),
"bssid":
device.get("bssid", None),
"order_time":
device.get("orderTime", 0),
"fw_version":
device.get("extra", {}).get("fw_version", "unknown"),
}
if isinstance(device.get("extra", None), dict) and device["extra"]:
device_infos[did]["fw_version"] = device["extra"].get(
"fw_version", None
)
"fw_version", None)
device_infos[did]["mcu_version"] = device["extra"].get(
"mcu_version", None
)
device_infos[did]["platform"] = device["extra"].get("platform", None)
"mcu_version", None)
device_infos[did]["platform"] = device["extra"].get(
"platform", None)
next_start_did = res_obj.get("next_start_did", None)
if res_obj.get("has_more", False) and next_start_did:
device_infos.update(
await self.__get_device_list_page_async(
dids=dids, start_did=next_start_did
)
)
device_infos.update(await self.__get_device_list_page_async(
dids=dids, start_did=next_start_did))
return device_infos
async def get_devices_with_dids_async(
self, dids: list[str]
) -> Optional[dict[str, dict]]:
results: list[dict[str, dict]] = await asyncio.gather(
*[
self.__get_device_list_page_async(dids=dids[index : index + 150])
for index in range(0, len(dids), 150)
]
)
self, dids: list[str]) -> Optional[dict[str, dict]]:
results: list[dict[str, dict]] = await asyncio.gather(*[
self.__get_device_list_page_async(dids=dids[index:index + 150])
for index in range(0, len(dids), 150)
])
devices = {}
for result in results:
if result is None:
@ -632,15 +638,16 @@ class MIoTHttpClient:
devices.update(result)
return devices
async def get_devices_async(
self, home_ids: Optional[list[str]] = None
) -> dict[str, dict]:
async def get_devices_async(self,
home_ids: Optional[list[str]] = None
) -> dict[str, dict]:
homeinfos = await self.get_homeinfos_async()
homes: dict[str, dict[str, Any]] = {}
devices: dict[str, dict] = {}
for device_type in ["home_list", "share_home_list"]:
homes.setdefault(device_type, {})
for home_id, home_info in (homeinfos.get(device_type, None) or {}).items():
for home_id, home_info in (homeinfos.get(device_type, None) or
{}).items():
if isinstance(home_ids, list) and home_id not in home_ids:
continue
home_name: str = home_info["home_name"]
@ -654,34 +661,30 @@ class MIoTHttpClient:
"room_info": {},
},
)
devices.update(
{
devices.update({
did: {
"home_id": home_id,
"home_name": home_name,
"room_id": home_id,
"room_name": home_name,
"group_id": group_id,
} for did in home_info.get("dids", [])
})
for room_id, room_info in home_info.get("room_info").items():
room_name: str = room_info.get("room_name", "")
homes[device_type][home_id]["room_info"][
room_id] = room_name
devices.update({
did: {
"home_id": home_id,
"home_name": home_name,
"room_id": home_id,
"room_name": home_name,
"room_id": room_id,
"room_name": room_name,
"group_id": group_id,
}
for did in home_info.get("dids", [])
}
)
for room_id, room_info in home_info.get("room_info").items():
room_name: str = room_info.get("room_name", "")
homes[device_type][home_id]["room_info"][room_id] = room_name
devices.update(
{
did: {
"home_id": home_id,
"home_name": home_name,
"room_id": room_id,
"room_name": room_name,
"group_id": group_id,
}
for did in room_info.get("dids", [])
}
)
separated_shared_devices: dict = await self.get_separated_shared_devices_async()
} for did in room_info.get("dids", [])
})
separated_shared_devices: dict = await self.get_separated_shared_devices_async(
)
if separated_shared_devices:
homes.setdefault("separated_shared_list", {})
for did, owner in separated_shared_devices.items():
@ -692,20 +695,20 @@ class MIoTHttpClient:
"home_name": owner["nickname"],
"uid": owner_id,
"group_id": "NotSupport",
"room_info": {"shared_device": "shared_device"},
"room_info": {
"shared_device": "shared_device"
},
},
)
devices.update(
{
did: {
"home_id": owner_id,
"home_name": owner["nickname"],
"room_id": "shared_device",
"room_name": "shared_device",
"group_id": "NotSupport",
}
devices.update({
did: {
"home_id": owner_id,
"home_name": owner["nickname"],
"room_id": "shared_device",
"room_name": "shared_device",
"group_id": "NotSupport",
}
)
})
dids = sorted(list(devices.keys()))
results = await self.get_devices_with_dids_async(dids=dids)
if results is None:
@ -724,7 +727,8 @@ class MIoTHttpClient:
parent_did = did.replace(match_str.group(), "")
if parent_did in devices:
devices[parent_did].setdefault("sub_devices", {})
devices[parent_did]["sub_devices"][match_str.group()[1:]] = device
devices[parent_did]["sub_devices"][match_str.group()
[1:]] = device
else:
_LOGGER.error("unknown sub devices, %s, %s", did, parent_did)
return {"uid": homeinfos["uid"], "homes": homes, "devices": devices}
@ -736,16 +740,21 @@ class MIoTHttpClient:
"""
res_obj = await self.__mihome_api_post_async(
url_path="/app/v2/miotspec/prop/get",
data={"datasource": 1, "params": params},
data={
"datasource": 1,
"params": params
},
)
if "result" not in res_obj:
raise MIoTHttpError("invalid response result")
return res_obj["result"]
async def __get_prop_async(self, did: str, siid: int, piid: int) -> Any:
results = await self.get_props_async(
params=[{"did": did, "siid": siid, "piid": piid}]
)
results = await self.get_props_async(params=[{
"did": did,
"siid": siid,
"piid": piid
}])
if not results:
return None
result = results[0]
@ -773,7 +782,8 @@ class MIoTHttpClient:
results = await self.get_props_async(props_buffer)
for result in results:
if not all(key in result for key in ["did", "siid", "piid", "value"]):
if not all(
key in result for key in ["did", "siid", "piid", "value"]):
continue
key = f"{result['did']}.{result['siid']}.{result['piid']}"
prop_obj = self._get_prop_list.pop(key, None)
@ -800,9 +810,11 @@ class MIoTHttpClient:
self._get_prop_timer = None
return True
async def get_prop_async(
self, did: str, siid: int, piid: int, immediately: bool = False
) -> Any:
async def get_prop_async(self,
did: str,
siid: int,
piid: int,
immediately: bool = False) -> Any:
if immediately:
return await self.__get_prop_async(did, siid, piid)
key: str = f"{did}.{siid}.{piid}"
@ -811,7 +823,11 @@ class MIoTHttpClient:
return await prop_obj["fut"]
fut = self._main_loop.create_future()
self._get_prop_list[key] = {
"param": {"did": did, "siid": siid, "piid": piid},
"param": {
"did": did,
"siid": siid,
"piid": piid
},
"fut": fut,
}
if self._get_prop_timer is None:
@ -827,8 +843,9 @@ class MIoTHttpClient:
params = [{"did": "xxxx", "siid": 2, "piid": 1, "value": False}]
"""
res_obj = await self.__mihome_api_post_async(
url_path="/app/v2/miotspec/prop/set", data={"params": params}, timeout=15
)
url_path="/app/v2/miotspec/prop/set",
data={"params": params},
timeout=15)
if "result" not in res_obj:
raise MIoTHttpError("invalid response result")
@ -839,16 +856,16 @@ class MIoTHttpClient:
params = [{"did": "xxxx", "siid": 2, "piid": 1, "value": False}]
"""
res_obj = await self.__mihome_api_post_async(
url_path="/app/v2/miotspec/prop/set", data={"params": params}, timeout=15
)
url_path="/app/v2/miotspec/prop/set",
data={"params": params},
timeout=15)
if "result" not in res_obj:
raise MIoTHttpError("invalid response result")
return res_obj["result"]
async def action_async(
self, did: str, siid: int, aiid: int, in_list: list[dict]
) -> dict:
async def action_async(self, did: str, siid: int, aiid: int,
in_list: list[dict]) -> dict:
"""
params = {"did": "xxxx", "siid": 2, "aiid": 1, "in": []}
"""

View File

@ -79,7 +79,6 @@ from homeassistant.const import (
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.components.switch import SwitchDeviceClass
# pylint: disable=relative-beyond-top-level
from .specs.specv2entity import (
SPEC_ACTION_TRANS_MAP,
@ -117,7 +116,8 @@ class MIoTEntityData:
events: set[MIoTSpecEvent]
actions: set[MIoTSpecAction]
def __init__(self, platform: str, spec: MIoTSpecInstance | MIoTSpecService) -> None:
def __init__(self, platform: str,
spec: MIoTSpecInstance | MIoTSpecService) -> None:
self.platform = platform
self.spec = spec
self.device_class = None
@ -151,7 +151,8 @@ class MIoTDevice:
_suggested_area: Optional[str]
_sub_id: int
_device_state_sub_list: dict[str, dict[str, Callable[[str, MIoTDeviceState], None]]]
_device_state_sub_list: dict[str, dict[str, Callable[[str, MIoTDeviceState],
None]]]
_value_sub_list: dict[str, dict[str, Callable[[dict, Any], None]]]
_entity_list: dict[str, list[MIoTEntityData]]
@ -183,7 +184,8 @@ class MIoTDevice:
self._room_name = device_info.get("room_name", None)
match self.miot_client.area_name_rule:
case "home_room":
self._suggested_area = f"{self._home_name} {self._room_name}".strip()
self._suggested_area = f"{self._home_name} {self._room_name}".strip(
)
case "home":
self._suggested_area = self._home_name.strip()
case "room":
@ -206,15 +208,14 @@ class MIoTDevice:
sub_info = sub_devices.get(f"s{service.iid}", None)
if sub_info is None:
continue
_LOGGER.debug(
"miot device, update service sub info, %s, %s", self.did, sub_info
)
_LOGGER.debug("miot device, update service sub info, %s, %s",
self.did, sub_info)
service.description_trans = sub_info.get(
"name", service.description_trans
)
"name", service.description_trans)
# Sub device state
self.miot_client.sub_device_state(self._did, self.__on_device_state_changed)
self.miot_client.sub_device_state(self._did,
self.__on_device_state_changed)
_LOGGER.debug("miot device init %s", device_info)
@ -239,13 +240,14 @@ class MIoTDevice:
return self._action_list
async def action_async(self, siid: int, aiid: int, in_list: list) -> list:
return await self.miot_client.action_async(
did=self._did, siid=siid, aiid=aiid, in_list=in_list
)
return await self.miot_client.action_async(did=self._did,
siid=siid,
aiid=aiid,
in_list=in_list)
def sub_device_state(
self, key: str, handler: Callable[[str, MIoTDeviceState], None]
) -> int:
self, key: str, handler: Callable[[str, MIoTDeviceState],
None]) -> int:
sub_id = self.__gen_sub_id()
if key in self._device_state_sub_list:
self._device_state_sub_list[key][str(sub_id)] = handler
@ -260,9 +262,8 @@ class MIoTDevice:
if not sub_list:
self._device_state_sub_list.pop(key, None)
def sub_property(
self, handler: Callable[[dict, Any], None], siid: int, piid: int
) -> int:
def sub_property(self, handler: Callable[[dict, Any], None], siid: int,
piid: int) -> int:
key: str = f"p.{siid}.{piid}"
def _on_prop_changed(params: dict, ctx: Any) -> None:
@ -274,9 +275,10 @@ class MIoTDevice:
self._value_sub_list[key][str(sub_id)] = handler
else:
self._value_sub_list[key] = {str(sub_id): handler}
self.miot_client.sub_prop(
did=self._did, handler=_on_prop_changed, siid=siid, piid=piid
)
self.miot_client.sub_prop(did=self._did,
handler=_on_prop_changed,
siid=siid,
piid=piid)
return sub_id
def unsub_property(self, siid: int, piid: int, sub_id: int) -> None:
@ -289,9 +291,8 @@ class MIoTDevice:
self.miot_client.unsub_prop(did=self._did, siid=siid, piid=piid)
self._value_sub_list.pop(key, None)
def sub_event(
self, handler: Callable[[dict, Any], None], siid: int, eiid: int
) -> int:
def sub_event(self, handler: Callable[[dict, Any], None], siid: int,
eiid: int) -> int:
key: str = f"e.{siid}.{eiid}"
def _on_event_occurred(params: dict, ctx: Any) -> None:
@ -303,9 +304,10 @@ class MIoTDevice:
self._value_sub_list[key][str(sub_id)] = handler
else:
self._value_sub_list[key] = {str(sub_id): handler}
self.miot_client.sub_event(
did=self._did, handler=_on_event_occurred, siid=siid, eiid=eiid
)
self.miot_client.sub_event(did=self._did,
handler=_on_event_occurred,
siid=siid,
eiid=eiid)
return sub_id
def unsub_event(self, siid: int, eiid: int, sub_id: int) -> None:
@ -330,8 +332,7 @@ class MIoTDevice:
suggested_area=self._suggested_area,
configuration_url=(
f"https://home.mi.com/webapp/content/baike/product/index.html?"
f"model={self._model}"
),
f"model={self._model}"),
)
@property
@ -341,46 +342,35 @@ class MIoTDevice:
@property
def did_tag(self) -> str:
return slugify_did(cloud_server=self.miot_client.cloud_server, did=self._did)
return slugify_did(cloud_server=self.miot_client.cloud_server,
did=self._did)
def gen_device_entity_id(self, ha_domain: str) -> str:
return (
f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}"
)
return (f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}")
def gen_service_entity_id(self, ha_domain: str, siid: int, description: str) -> str:
return (
f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_s_{siid}_{description}"
)
def gen_service_entity_id(self, ha_domain: str, siid: int,
description: str) -> str:
return (f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_s_{siid}_{description}")
def gen_prop_entity_id(
self, ha_domain: str, spec_name: str, siid: int, piid: int
) -> str:
return (
f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_{slugify_name(spec_name)}"
f"_p_{siid}_{piid}"
)
def gen_prop_entity_id(self, ha_domain: str, spec_name: str, siid: int,
piid: int) -> str:
return (f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_{slugify_name(spec_name)}"
f"_p_{siid}_{piid}")
def gen_event_entity_id(
self, ha_domain: str, spec_name: str, siid: int, eiid: int
) -> str:
return (
f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_{slugify_name(spec_name)}"
f"_e_{siid}_{eiid}"
)
def gen_event_entity_id(self, ha_domain: str, spec_name: str, siid: int,
eiid: int) -> str:
return (f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_{slugify_name(spec_name)}"
f"_e_{siid}_{eiid}")
def gen_action_entity_id(
self, ha_domain: str, spec_name: str, siid: int, aiid: int
) -> str:
return (
f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_{slugify_name(spec_name)}"
f"_a_{siid}_{aiid}"
)
def gen_action_entity_id(self, ha_domain: str, spec_name: str, siid: int,
aiid: int) -> str:
return (f"{ha_domain}.{self._model_strs[0][:9]}_{self.did_tag}_"
f"{self._model_strs[-1][:20]}_{slugify_name(spec_name)}"
f"_a_{siid}_{aiid}")
@property
def name(self) -> str:
@ -417,8 +407,7 @@ class MIoTDevice:
self._action_list[action.platform].append(action)
def parse_miot_device_entity(
self, spec_instance: MIoTSpecInstance
) -> Optional[MIoTEntityData]:
self, spec_instance: MIoTSpecInstance) -> Optional[MIoTEntityData]:
if spec_instance.name not in SPEC_DEVICE_TRANS_MAP:
return None
spec_name: str = spec_instance.name
@ -428,9 +417,8 @@ class MIoTDevice:
return None
# 1. The device shall have all required services.
required_services = SPEC_DEVICE_TRANS_MAP[spec_name]["required"].keys()
if not {service.name for service in spec_instance.services}.issuperset(
required_services
):
if not {service.name for service in spec_instance.services
}.issuperset(required_services):
return None
optional_services = SPEC_DEVICE_TRANS_MAP[spec_name]["optional"].keys()
@ -446,74 +434,56 @@ class MIoTDevice:
# 2. The service shall have all required properties, actions.
if service.name in required_services:
required_properties = (
SPEC_DEVICE_TRANS_MAP[spec_name]["required"]
.get(service.name, {})
.get("required", {})
.get("properties", {})
)
SPEC_DEVICE_TRANS_MAP[spec_name]["required"].get(
service.name, {}).get("required",
{}).get("properties", {}))
optional_properties = (
SPEC_DEVICE_TRANS_MAP[spec_name]["required"]
.get(service.name, {})
.get("optional", {})
.get("properties", set({}))
)
SPEC_DEVICE_TRANS_MAP[spec_name]["required"].get(
service.name, {}).get("optional",
{}).get("properties", set({})))
required_actions = (
SPEC_DEVICE_TRANS_MAP[spec_name]["required"]
.get(service.name, {})
.get("required", {})
.get("actions", set({}))
)
SPEC_DEVICE_TRANS_MAP[spec_name]["required"].get(
service.name, {}).get("required",
{}).get("actions", set({})))
optional_actions = (
SPEC_DEVICE_TRANS_MAP[spec_name]["required"]
.get(service.name, {})
.get("optional", {})
.get("actions", set({}))
)
SPEC_DEVICE_TRANS_MAP[spec_name]["required"].get(
service.name, {}).get("optional",
{}).get("actions", set({})))
elif service.name in optional_services:
required_properties = (
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"]
.get(service.name, {})
.get("required", {})
.get("properties", {})
)
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"].get(
service.name, {}).get("required",
{}).get("properties", {}))
optional_properties = (
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"]
.get(service.name, {})
.get("optional", {})
.get("properties", set({}))
)
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"].get(
service.name, {}).get("optional",
{}).get("properties", set({})))
required_actions = (
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"]
.get(service.name, {})
.get("required", {})
.get("actions", set({}))
)
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"].get(
service.name, {}).get("required",
{}).get("actions", set({})))
optional_actions = (
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"]
.get(service.name, {})
.get("optional", {})
.get("actions", set({}))
)
SPEC_DEVICE_TRANS_MAP[spec_name]["optional"].get(
service.name, {}).get("optional",
{}).get("actions", set({})))
else:
continue
if not {prop.name for prop in service.properties if prop.access}.issuperset(
set(required_properties.keys())
):
if not {prop.name for prop in service.properties if prop.access
}.issuperset(set(required_properties.keys())):
return None
if not {action.name for action in service.actions}.issuperset(
required_actions
):
if not {action.name for action in service.actions
}.issuperset(required_actions):
return None
# 3. The required property shall have all required access mode.
for prop in service.properties:
if prop.name in required_properties:
if not set(prop.access).issuperset(required_properties[prop.name]):
if not set(prop.access).issuperset(
required_properties[prop.name]):
return None
# property
for prop in service.properties:
if prop.name in set.union(
set(required_properties.keys()), optional_properties
):
if prop.name in set.union(set(required_properties.keys()),
optional_properties):
if prop.unit:
prop.external_unit = self.unit_convert(prop.unit)
# prop.icon = self.icon_convert(prop.unit)
@ -530,8 +500,7 @@ class MIoTDevice:
return entity_data
def parse_miot_service_entity(
self, miot_service: MIoTSpecService
) -> Optional[MIoTEntityData]:
self, miot_service: MIoTSpecService) -> Optional[MIoTEntityData]:
if miot_service.platform or miot_service.name not in SPEC_SERVICE_TRANS_MAP:
return None
service_name = miot_service.name
@ -541,28 +510,25 @@ class MIoTDevice:
return None
# Required properties, required access mode
required_properties: dict = SPEC_SERVICE_TRANS_MAP[service_name][
"required"
].get("properties", {})
if not {
prop.name for prop in miot_service.properties if prop.access
}.issuperset(set(required_properties.keys())):
"required"].get("properties", {})
if not {prop.name for prop in miot_service.properties if prop.access
}.issuperset(set(required_properties.keys())):
return None
for prop in miot_service.properties:
if prop.name in required_properties:
if not set(prop.access).issuperset(required_properties[prop.name]):
if not set(prop.access).issuperset(
required_properties[prop.name]):
return None
# Required actions
# Required events
platform = SPEC_SERVICE_TRANS_MAP[service_name]["entity"]
entity_data = MIoTEntityData(platform=platform, spec=miot_service)
# Optional properties
optional_properties = SPEC_SERVICE_TRANS_MAP[service_name]["optional"].get(
"properties", set({})
)
optional_properties = SPEC_SERVICE_TRANS_MAP[service_name][
"optional"].get("properties", set({}))
for prop in miot_service.properties:
if prop.name in set.union(
set(required_properties.keys()), optional_properties
):
if prop.name in set.union(set(required_properties.keys()),
optional_properties):
if prop.unit:
prop.external_unit = self.unit_convert(prop.unit)
# prop.icon = self.icon_convert(prop.unit)
@ -573,16 +539,13 @@ class MIoTDevice:
miot_service.platform = platform
# entity_category
if entity_category := SPEC_SERVICE_TRANS_MAP[service_name].get(
"entity_category", None
):
"entity_category", None):
miot_service.entity_category = entity_category
return entity_data
def parse_miot_property_entity(self, miot_prop: MIoTSpecProperty) -> bool:
if (
miot_prop.platform
or miot_prop.name not in SPEC_PROP_TRANS_MAP["properties"]
):
if (miot_prop.platform or
miot_prop.name not in SPEC_PROP_TRANS_MAP["properties"]):
return False
prop_name = miot_prop.name
if isinstance(SPEC_PROP_TRANS_MAP["properties"][prop_name], str):
@ -596,27 +559,20 @@ class MIoTDevice:
prop_access.add("write")
if prop_access != (SPEC_PROP_TRANS_MAP["entities"][platform]["access"]):
return False
if (
miot_prop.format_.__name__
not in SPEC_PROP_TRANS_MAP["entities"][platform]["format"]
):
if (miot_prop.format_.__name__
not in SPEC_PROP_TRANS_MAP["entities"][platform]["format"]):
return False
miot_prop.device_class = SPEC_PROP_TRANS_MAP["properties"][prop_name][
"device_class"
]
"device_class"]
# Optional params
if "state_class" in SPEC_PROP_TRANS_MAP["properties"][prop_name]:
miot_prop.state_class = SPEC_PROP_TRANS_MAP["properties"][prop_name][
"state_class"
]
if (
not miot_prop.external_unit
and "unit_of_measurement" in SPEC_PROP_TRANS_MAP["properties"][prop_name]
):
miot_prop.state_class = SPEC_PROP_TRANS_MAP["properties"][
prop_name]["state_class"]
if (not miot_prop.external_unit and "unit_of_measurement"
in SPEC_PROP_TRANS_MAP["properties"][prop_name]):
# Priority: spec_modify.unit > unit_convert > specv2entity.unit
miot_prop.external_unit = SPEC_PROP_TRANS_MAP["properties"][prop_name][
"unit_of_measurement"
]
miot_prop.external_unit = SPEC_PROP_TRANS_MAP["properties"][
prop_name]["unit_of_measurement"]
# Priority: default.icon when device_class is set > spec_modify.icon
# > icon_convert
miot_prop.platform = platform
@ -625,12 +581,14 @@ class MIoTDevice:
def spec_transform(self) -> None:
"""Parse service, property, event, action from device spec."""
# STEP 1: device conversion
device_entity = self.parse_miot_device_entity(spec_instance=self.spec_instance)
device_entity = self.parse_miot_device_entity(
spec_instance=self.spec_instance)
if device_entity:
self.append_entity(entity_data=device_entity)
# STEP 2: service conversion
for service in self.spec_instance.services:
service_entity = self.parse_miot_service_entity(miot_service=service)
service_entity = self.parse_miot_service_entity(
miot_service=service)
if service_entity:
self.append_entity(entity_data=service_entity)
# STEP 3.1: property conversion
@ -814,14 +772,14 @@ class MIoTDevice:
if spec_unit in {"percentage"}:
return "mdi:percent"
if spec_unit in {
"weeks",
"days",
"hour",
"hours",
"minutes",
"seconds",
"ms",
"μs",
"weeks",
"days",
"hour",
"hours",
"minutes",
"seconds",
"ms",
"μs",
}:
return "mdi:clock"
if spec_unit in {"celsius"}:
@ -878,13 +836,13 @@ class MIoTDevice:
self._sub_id += 1
return self._sub_id
def __on_device_state_changed(
self, did: str, state: MIoTDeviceState, ctx: Any
) -> None:
def __on_device_state_changed(self, did: str, state: MIoTDeviceState,
ctx: Any) -> None:
self._online = state == MIoTDeviceState.ONLINE
for key, sub_list in self._device_state_sub_list.items():
for handler in sub_list.values():
self.miot_client.main_loop.call_soon_threadsafe(handler, key, state)
self.miot_client.main_loop.call_soon_threadsafe(
handler, key, state)
class MIoTServiceEntity(Entity):
@ -901,11 +859,13 @@ class MIoTServiceEntity(Entity):
_value_sub_ids: dict[str, int]
_event_occurred_handler: Optional[Callable[[MIoTSpecEvent, dict], None]]
_prop_changed_subs: dict[MIoTSpecProperty, Callable[[MIoTSpecProperty, Any], None]]
_prop_changed_subs: dict[MIoTSpecProperty, Callable[[MIoTSpecProperty, Any],
None]]
_pending_write_ha_state_timer: Optional[asyncio.TimerHandle]
def __init__(self, miot_device: MIoTDevice, entity_data: MIoTEntityData) -> None:
def __init__(self, miot_device: MIoTDevice,
entity_data: MIoTEntityData) -> None:
if miot_device is None or entity_data is None or entity_data.spec is None:
raise MIoTDeviceError("init error, invalid params")
self.miot_device = miot_device
@ -926,8 +886,7 @@ class MIoTServiceEntity(Entity):
)
self._attr_name = (
f"{'* ' if self.entity_data.spec.proprietary else ' '}"
f"{self.entity_data.spec.description_trans}"
)
f"{self.entity_data.spec.description_trans}")
self._attr_entity_category = entity_data.spec.entity_category
# Set entity attr
self._attr_unique_id = self.entity_id
@ -947,7 +906,8 @@ class MIoTServiceEntity(Entity):
)
@property
def event_occurred_handler(self) -> Optional[Callable[[MIoTSpecEvent, dict], None]]:
def event_occurred_handler(
self) -> Optional[Callable[[MIoTSpecEvent, dict], None]]:
return self._event_occurred_handler
@event_occurred_handler.setter
@ -955,8 +915,8 @@ class MIoTServiceEntity(Entity):
self._event_occurred_handler = func
def sub_prop_changed(
self, prop: MIoTSpecProperty, handler: Callable[[MIoTSpecProperty, Any], None]
) -> None:
self, prop: MIoTSpecProperty,
handler: Callable[[MIoTSpecProperty, Any], None]) -> None:
if not prop or not handler:
_LOGGER.error("sub_prop_changed error, invalid prop/handler")
return
@ -974,8 +934,7 @@ class MIoTServiceEntity(Entity):
if isinstance(self.entity_data.spec, MIoTSpecService):
state_id = f"s.{self.entity_data.spec.iid}"
self._state_sub_id = self.miot_device.sub_device_state(
key=state_id, handler=self.__on_device_state_changed
)
key=state_id, handler=self.__on_device_state_changed)
# Sub prop
for prop in self.entity_data.props:
if not prop.notifiable and not prop.readable:
@ -990,8 +949,9 @@ class MIoTServiceEntity(Entity):
for event in self.entity_data.events:
key = f"e.{event.service.iid}.{event.iid}"
self._value_sub_ids[key] = self.miot_device.sub_event(
handler=self.__on_event_occurred, siid=event.service.iid, eiid=event.iid
)
handler=self.__on_event_occurred,
siid=event.service.iid,
eiid=event.iid)
# Refresh value
if self._attr_available:
@ -1004,30 +964,34 @@ class MIoTServiceEntity(Entity):
state_id = "s.0"
if isinstance(self.entity_data.spec, MIoTSpecService):
state_id = f"s.{self.entity_data.spec.iid}"
self.miot_device.unsub_device_state(key=state_id, sub_id=self._state_sub_id)
self.miot_device.unsub_device_state(key=state_id,
sub_id=self._state_sub_id)
# Unsub prop
for prop in self.entity_data.props:
if not prop.notifiable and not prop.readable:
continue
sub_id = self._value_sub_ids.pop(f"p.{prop.service.iid}.{prop.iid}", None)
sub_id = self._value_sub_ids.pop(f"p.{prop.service.iid}.{prop.iid}",
None)
if sub_id:
self.miot_device.unsub_property(
siid=prop.service.iid, piid=prop.iid, sub_id=sub_id
)
self.miot_device.unsub_property(siid=prop.service.iid,
piid=prop.iid,
sub_id=sub_id)
# Unsub event
for event in self.entity_data.events:
sub_id = self._value_sub_ids.pop(f"e.{event.service.iid}.{event.iid}", None)
sub_id = self._value_sub_ids.pop(
f"e.{event.service.iid}.{event.iid}", None)
if sub_id:
self.miot_device.unsub_event(
siid=event.service.iid, eiid=event.iid, sub_id=sub_id
)
self.miot_device.unsub_event(siid=event.service.iid,
eiid=event.iid,
sub_id=sub_id)
def get_map_value(self, map_: Optional[dict[int, Any]], key: int) -> Any:
if map_ is None:
return None
return map_.get(key, None)
def get_map_key(self, map_: Optional[dict[int, Any]], value: Any) -> Optional[int]:
def get_map_key(self, map_: Optional[dict[int, Any]],
value: Any) -> Optional[int]:
if map_ is None:
return None
for key, value_ in map_.items():
@ -1045,7 +1009,8 @@ class MIoTServiceEntity(Entity):
return None
return self._prop_value_map.get(prop, None)
def set_prop_value(self, prop: Optional[MIoTSpecProperty], value: Any) -> None:
def set_prop_value(self, prop: Optional[MIoTSpecProperty],
value: Any) -> None:
if not prop:
_LOGGER.error(
"set_prop_value error, property is None, %s, %s",
@ -1068,15 +1033,11 @@ class MIoTServiceEntity(Entity):
)
value = prop.value_format(value)
if prop not in self.entity_data.props:
raise RuntimeError(
f"set property failed, unknown property, "
f"{self.entity_id}, {self.name}, {prop.name}"
)
raise RuntimeError(f"set property failed, unknown property, "
f"{self.entity_id}, {self.name}, {prop.name}")
if not prop.writable:
raise RuntimeError(
f"set property failed, not writable, "
f"{self.entity_id}, {self.name}, {prop.name}"
)
raise RuntimeError(f"set property failed, not writable, "
f"{self.entity_id}, {self.name}, {prop.name}")
try:
await self.miot_device.miot_client.set_prop_async(
did=self.miot_device.did,
@ -1086,8 +1047,7 @@ class MIoTServiceEntity(Entity):
)
except MIoTClientError as e:
raise RuntimeError(
f"{e}, {self.entity_id}, {self.name}, {prop.name}"
) from e
f"{e}, {self.entity_id}, {self.name}, {prop.name}") from e
if update_value:
self._prop_value_map[prop] = value
if write_ha_state:
@ -1104,40 +1064,32 @@ class MIoTServiceEntity(Entity):
prop = set_property.get("prop")
value = set_property.get("value")
if not prop:
raise RuntimeError(
f"set property failed, property is None, "
f"{self.entity_id}, {self.name}"
)
raise RuntimeError(f"set property failed, property is None, "
f"{self.entity_id}, {self.name}")
set_property["value"] = prop.value_format(value)
if prop not in self.entity_data.props:
raise RuntimeError(
f"set property failed, unknown property, "
f"{self.entity_id}, {self.name}, {prop.name}"
)
f"{self.entity_id}, {self.name}, {prop.name}")
if not prop.writable:
raise RuntimeError(
f"set property failed, not writable, "
f"{self.entity_id}, {self.name}, {prop.name}"
)
f"{self.entity_id}, {self.name}, {prop.name}")
try:
await self.miot_device.miot_client.set_props_async(
[
{
"did": self.miot_device.did,
"siid": set_property["prop"].service.iid,
"piid": set_property["prop"].iid,
"value": set_property["value"],
}
for set_property in set_properties_list
]
)
await self.miot_device.miot_client.set_props_async([{
"did": self.miot_device.did,
"siid": set_property["prop"].service.iid,
"piid": set_property["prop"].iid,
"value": set_property["value"],
} for set_property in set_properties_list])
except MIoTClientError as e:
raise RuntimeError(
f"{e}, {self.entity_id}, {self.name}, {'/'.join([set_property['prop'].name for set_property in set_properties_list])}"
) from e
if update_value:
for set_property in set_properties_list:
self._prop_value_map[set_property["prop"]] = set_property["value"]
self._prop_value_map[
set_property["prop"]] = set_property["value"]
if write_ha_state:
self.async_write_ha_state()
return True
@ -1166,23 +1118,22 @@ class MIoTServiceEntity(Entity):
prop.name,
)
return None
result = prop.value_format(
await self.miot_device.miot_client.get_prop_async(
did=self.miot_device.did, siid=prop.service.iid, piid=prop.iid
)
)
result = prop.value_format(await
self.miot_device.miot_client.get_prop_async(
did=self.miot_device.did,
siid=prop.service.iid,
piid=prop.iid))
if result != self._prop_value_map[prop]:
self._prop_value_map[prop] = result
self.async_write_ha_state()
return result
async def action_async(
self, action: MIoTSpecAction, in_list: Optional[list] = None
) -> bool:
async def action_async(self,
action: MIoTSpecAction,
in_list: Optional[list] = None) -> bool:
if not action:
raise RuntimeError(
f"action failed, action is None, {self.entity_id}, {self.name}"
)
f"action failed, action is None, {self.entity_id}, {self.name}")
try:
await self.miot_device.miot_client.action_async(
did=self.miot_device.did,
@ -1192,8 +1143,7 @@ class MIoTServiceEntity(Entity):
)
except MIoTClientError as e:
raise RuntimeError(
f"{e}, {self.entity_id}, {self.name}, {action.name}"
) from e
f"{e}, {self.entity_id}, {self.name}, {action.name}") from e
return True
def __on_properties_changed(self, params: dict, ctx: Any) -> None:
@ -1214,7 +1164,8 @@ class MIoTServiceEntity(Entity):
if self._event_occurred_handler is None:
return
for event in self.entity_data.events:
if event.iid != params["eiid"] or event.service.iid != params["siid"]:
if event.iid != params["eiid"] or event.service.iid != params[
"siid"]:
continue
trans_arg = {}
for item in params["arguments"]:
@ -1225,7 +1176,8 @@ class MIoTServiceEntity(Entity):
self._event_occurred_handler(event, trans_arg)
break
def __on_device_state_changed(self, key: str, state: MIoTDeviceState) -> None:
def __on_device_state_changed(self, key: str,
state: MIoTDeviceState) -> None:
state_new = state == MIoTDeviceState.ONLINE
if state_new == self._attr_available:
return
@ -1240,13 +1192,11 @@ class MIoTServiceEntity(Entity):
if not prop.readable:
continue
self.miot_device.miot_client.request_refresh_prop(
did=self.miot_device.did, siid=prop.service.iid, piid=prop.iid
)
did=self.miot_device.did, siid=prop.service.iid, piid=prop.iid)
if self._pending_write_ha_state_timer:
self._pending_write_ha_state_timer.cancel()
self._pending_write_ha_state_timer = self._main_loop.call_later(
1, self.__write_ha_state_handler
)
1, self.__write_ha_state_handler)
def __write_ha_state_handler(self) -> None:
self._pending_write_ha_state_timer = None
@ -1287,16 +1237,17 @@ class MIoTPropertyEntity(Entity):
self._pending_write_ha_state_timer = None
# Gen entity_id
self.entity_id = self.miot_device.gen_prop_entity_id(
ha_domain=DOMAIN, spec_name=spec.name, siid=spec.service.iid, piid=spec.iid
)
ha_domain=DOMAIN,
spec_name=spec.name,
siid=spec.service.iid,
piid=spec.iid)
# Set entity attr
self._attr_unique_id = self.entity_id
self._attr_should_poll = False
self._attr_has_entity_name = True
self._attr_name = (
f"{'* ' if self.spec.proprietary else ' '}"
f"{self.service.description_trans} {spec.description_trans}"
)
f"{self.service.description_trans} {spec.description_trans}")
self._attr_available = miot_device.online
_LOGGER.info(
@ -1322,8 +1273,9 @@ class MIoTPropertyEntity(Entity):
)
# Sub value changed
self._value_sub_id = self.miot_device.sub_property(
handler=self.__on_value_changed, siid=self.service.iid, piid=self.spec.iid
)
handler=self.__on_value_changed,
siid=self.service.iid,
piid=self.spec.iid)
# Refresh value
if self._attr_available:
self.__request_refresh_prop()
@ -1333,11 +1285,11 @@ class MIoTPropertyEntity(Entity):
self._pending_write_ha_state_timer.cancel()
self._pending_write_ha_state_timer = None
self.miot_device.unsub_device_state(
key=f"{self.service.iid}.{self.spec.iid}", sub_id=self._state_sub_id
)
self.miot_device.unsub_property(
siid=self.service.iid, piid=self.spec.iid, sub_id=self._value_sub_id
)
key=f"{self.service.iid}.{self.spec.iid}",
sub_id=self._state_sub_id)
self.miot_device.unsub_property(siid=self.service.iid,
piid=self.spec.iid,
sub_id=self._value_sub_id)
def get_vlist_description(self, value: Any) -> Optional[str]:
if not self._value_list:
@ -1370,15 +1322,14 @@ class MIoTPropertyEntity(Entity):
async def get_property_async(self) -> Any:
if not self.spec.readable:
_LOGGER.error(
"get property failed, not readable, %s, %s", self.entity_id, self.name
)
_LOGGER.error("get property failed, not readable, %s, %s",
self.entity_id, self.name)
return None
return self.spec.value_format(
await self.miot_device.miot_client.get_prop_async(
did=self.miot_device.did, siid=self.spec.service.iid, piid=self.spec.iid
)
)
did=self.miot_device.did,
siid=self.spec.service.iid,
piid=self.spec.iid))
def __on_value_changed(self, params: dict, ctx: Any) -> None:
_LOGGER.debug("property changed, %s", params)
@ -1387,7 +1338,8 @@ class MIoTPropertyEntity(Entity):
if not self._pending_write_ha_state_timer:
self.async_write_ha_state()
def __on_device_state_changed(self, key: str, state: MIoTDeviceState) -> None:
def __on_device_state_changed(self, key: str,
state: MIoTDeviceState) -> None:
self._attr_available = state == MIoTDeviceState.ONLINE
if not self._attr_available:
self.async_write_ha_state()
@ -1398,13 +1350,13 @@ class MIoTPropertyEntity(Entity):
def __request_refresh_prop(self) -> None:
if self.spec.readable:
self.miot_device.miot_client.request_refresh_prop(
did=self.miot_device.did, siid=self.service.iid, piid=self.spec.iid
)
did=self.miot_device.did,
siid=self.service.iid,
piid=self.spec.iid)
if self._pending_write_ha_state_timer:
self._pending_write_ha_state_timer.cancel()
self._pending_write_ha_state_timer = self._main_loop.call_later(
1, self.__write_ha_state_handler
)
1, self.__write_ha_state_handler)
def __write_ha_state_handler(self) -> None:
self._pending_write_ha_state_timer = None
@ -1435,16 +1387,17 @@ class MIoTEventEntity(Entity):
self._main_loop = miot_device.miot_client.main_loop
# Gen entity_id
self.entity_id = self.miot_device.gen_event_entity_id(
ha_domain=DOMAIN, spec_name=spec.name, siid=spec.service.iid, eiid=spec.iid
)
ha_domain=DOMAIN,
spec_name=spec.name,
siid=spec.service.iid,
eiid=spec.iid)
# Set entity attr
self._attr_unique_id = self.entity_id
self._attr_should_poll = False
self._attr_has_entity_name = True
self._attr_name = (
f"{'* ' if self.spec.proprietary else ' '}"
f"{self.service.description_trans} {spec.description_trans}"
)
f"{self.service.description_trans} {spec.description_trans}")
self._attr_available = miot_device.online
self._attr_event_types = [spec.description_trans]
@ -1475,21 +1428,23 @@ class MIoTEventEntity(Entity):
)
# Sub value changed
self._value_sub_id = self.miot_device.sub_event(
handler=self.__on_event_occurred, siid=self.service.iid, eiid=self.spec.iid
)
handler=self.__on_event_occurred,
siid=self.service.iid,
eiid=self.spec.iid)
async def async_will_remove_from_hass(self) -> None:
self.miot_device.unsub_device_state(
key=f"event.{self.service.iid}.{self.spec.iid}", sub_id=self._state_sub_id
)
self.miot_device.unsub_event(
siid=self.service.iid, eiid=self.spec.iid, sub_id=self._value_sub_id
)
key=f"event.{self.service.iid}.{self.spec.iid}",
sub_id=self._state_sub_id)
self.miot_device.unsub_event(siid=self.service.iid,
eiid=self.spec.iid,
sub_id=self._value_sub_id)
@abstractmethod
def on_event_occurred(
self, name: str, arguments: dict[str, Any] | None = None
) -> None: ...
def on_event_occurred(self,
name: str,
arguments: dict[str, Any] | None = None) -> None:
...
def __on_event_occurred(self, params: dict, ctx: Any) -> None:
_LOGGER.debug("event occurred, %s", params)
@ -1500,9 +1455,8 @@ class MIoTEventEntity(Entity):
continue
if "piid" in item:
trans_arg[self._arguments_map[item["piid"]]] = item["value"]
elif isinstance(item["value"], list) and len(item["value"]) == len(
self.spec.argument
):
elif isinstance(item["value"], list) and len(
item["value"]) == len(self.spec.argument):
# Dirty fix for cloud multi-arguments
trans_arg = {
prop.description_trans: item["value"][index]
@ -1516,10 +1470,12 @@ class MIoTEventEntity(Entity):
params,
error,
)
self.on_event_occurred(name=self.spec.description_trans, arguments=trans_arg)
self.on_event_occurred(name=self.spec.description_trans,
arguments=trans_arg)
self.async_write_ha_state()
def __on_device_state_changed(self, key: str, state: MIoTDeviceState) -> None:
def __on_device_state_changed(self, key: str,
state: MIoTDeviceState) -> None:
state_new = state == MIoTDeviceState.ONLINE
if state_new == self._attr_available:
return
@ -1551,16 +1507,17 @@ class MIoTActionEntity(Entity):
self._state_sub_id = 0
# Gen entity_id
self.entity_id = self.miot_device.gen_action_entity_id(
ha_domain=DOMAIN, spec_name=spec.name, siid=spec.service.iid, aiid=spec.iid
)
ha_domain=DOMAIN,
spec_name=spec.name,
siid=spec.service.iid,
aiid=spec.iid)
# Set entity attr
self._attr_unique_id = self.entity_id
self._attr_should_poll = False
self._attr_has_entity_name = True
self._attr_name = (
f"{'* ' if self.spec.proprietary else ' '}"
f"{self.service.description_trans} {spec.description_trans}"
)
f"{self.service.description_trans} {spec.description_trans}")
self._attr_available = miot_device.online
_LOGGER.debug(
@ -1584,10 +1541,11 @@ class MIoTActionEntity(Entity):
async def async_will_remove_from_hass(self) -> None:
self.miot_device.unsub_device_state(
key=f"a.{self.service.iid}.{self.spec.iid}", sub_id=self._state_sub_id
)
key=f"a.{self.service.iid}.{self.spec.iid}",
sub_id=self._state_sub_id)
async def action_async(self, in_list: Optional[list] = None) -> Optional[list]:
async def action_async(self,
in_list: Optional[list] = None) -> Optional[list]:
try:
return await self.miot_device.miot_client.action_async(
did=self.miot_device.did,
@ -1598,7 +1556,8 @@ class MIoTActionEntity(Entity):
except MIoTClientError as e:
raise RuntimeError(f"{e}, {self.entity_id}, {self.name}") from e
def __on_device_state_changed(self, key: str, state: MIoTDeviceState) -> None:
def __on_device_state_changed(self, key: str,
state: MIoTDeviceState) -> None:
state_new = state == MIoTDeviceState.ONLINE
if state_new == self._attr_available:
return

View File

@ -69,7 +69,6 @@ from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo
from .miot_mdns import MipsService, MipsServiceState
from .common import randomize_float, load_yaml_file, gen_absolute_path, MIoTMatcher
_LOGGER = logging.getLogger(__name__)
@ -163,17 +162,18 @@ class _MIoTLanDevice:
# All functions SHOULD be called from the internal loop
def __init__(
self, manager: "MIoTLan", did: str, token: str, ip: Optional[str] = None
) -> None:
def __init__(self,
manager: "MIoTLan",
did: str,
token: str,
ip: Optional[str] = None) -> None:
self._manager: MIoTLan = manager
self.did = did
self.token = bytes.fromhex(token)
aes_key: bytes = self.__md5(self.token)
aex_iv: bytes = self.__md5(aes_key + self.token)
self.cipher = Cipher(
algorithms.AES128(aes_key), modes.CBC(aex_iv), default_backend()
)
self.cipher = Cipher(algorithms.AES128(aes_key), modes.CBC(aex_iv),
default_backend())
self.ip = ip
self.offset = 0
self.subscribed = False
@ -200,7 +200,8 @@ class _MIoTLanDevice:
self.ip = ip
if self._if_name != if_name:
self._if_name = if_name
_LOGGER.info("device if_name change, %s, %s", self._if_name, self.did)
_LOGGER.info("device if_name change, %s, %s", self._if_name,
self.did)
self.__update_keep_alive(state=_MIoTLanDeviceState.FRESH)
@property
@ -214,16 +215,18 @@ class _MIoTLanDevice:
self._online = online
self._manager.broadcast_device_state(
did=self.did,
state={"online": self._online, "push_available": self.subscribed},
state={
"online": self._online,
"push_available": self.subscribed
},
)
@property
def if_name(self) -> Optional[str]:
return self._if_name
def gen_packet(
self, out_buffer: bytearray, clear_data: dict, did: str, offset: int
) -> int:
def gen_packet(self, out_buffer: bytearray, clear_data: dict, did: str,
offset: int) -> int:
clear_bytes = json.dumps(clear_data, ensure_ascii=False).encode("utf-8")
padder = padding.PKCS7(algorithms.AES128.block_size).padder()
padded_data = padder.update(clear_bytes) + padder.finalize()
@ -232,9 +235,8 @@ class _MIoTLanDevice:
encryptor = self.cipher.encryptor()
encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
data_len: int = len(encrypted_data) + self.OT_HEADER_LEN
out_buffer[:32] = struct.pack(
">HHQI16s", self.OT_HEADER, data_len, int(did), offset, self.token
)
out_buffer[:32] = struct.pack(">HHQI16s", self.OT_HEADER, data_len,
int(did), offset, self.token)
out_buffer[32:data_len] = encrypted_data
msg_md5: bytes = self.__md5(out_buffer[0:data_len])
out_buffer[16:32] = msg_md5
@ -248,11 +250,11 @@ class _MIoTLanDevice:
if md5_orig != md5_calc:
raise ValueError(f"invalid md5, {md5_orig}, {md5_calc}")
decryptor = self.cipher.decryptor()
decrypted_padded_data = (
decryptor.update(encrypted_data[32:data_len]) + decryptor.finalize()
)
decrypted_padded_data = (decryptor.update(encrypted_data[32:data_len]) +
decryptor.finalize())
unpadder = padding.PKCS7(algorithms.AES128.block_size).unpadder()
decrypted_data = unpadder.update(decrypted_padded_data) + unpadder.finalize()
decrypted_data = unpadder.update(
decrypted_padded_data) + unpadder.finalize()
# Some device will add a redundant \0 at the end of JSON string
decrypted_data = decrypted_data.rstrip(b"\x00")
return json.loads(decrypted_data)
@ -303,7 +305,10 @@ class _MIoTLanDevice:
self.subscribed = False
self._manager.broadcast_device_state(
did=self.did,
state={"online": self._online, "push_available": self.subscribed},
state={
"online": self._online,
"push_available": self.subscribed
},
)
def on_delete(self) -> None:
@ -316,42 +321,35 @@ class _MIoTLanDevice:
_LOGGER.debug("miot lan device delete, %s", self.did)
def update_info(self, info: dict) -> None:
if (
"token" in info
and len(info["token"]) == 32
and info["token"].upper() != self.token.hex().upper()
):
if ("token" in info and len(info["token"]) == 32 and
info["token"].upper() != self.token.hex().upper()):
# Update token
self.token = bytes.fromhex(info["token"])
aes_key: bytes = self.__md5(self.token)
aex_iv: bytes = self.__md5(aes_key + self.token)
self.cipher = Cipher(
algorithms.AES128(aes_key), modes.CBC(aex_iv), default_backend()
)
self.cipher = Cipher(algorithms.AES128(aes_key), modes.CBC(aex_iv),
default_backend())
_LOGGER.debug("update token, %s", self.did)
def __subscribe_handler(self, msg: dict, sub_ts: int) -> None:
if (
"result" not in msg
or "code" not in msg["result"]
or msg["result"]["code"] != 0
):
if ("result" not in msg or "code" not in msg["result"] or
msg["result"]["code"] != 0):
_LOGGER.error("subscribe device error, %s, %s", self.did, msg)
return
self.subscribed = True
self.sub_ts = sub_ts
self._manager.broadcast_device_state(
did=self.did,
state={"online": self._online, "push_available": self.subscribed},
state={
"online": self._online,
"push_available": self.subscribed
},
)
_LOGGER.info("subscribe success, %s, %s", self._if_name, self.did)
def __unsubscribe_handler(self, msg: dict, ctx: Any) -> None:
if (
"result" not in msg
or "code" not in msg["result"]
or msg["result"]["code"] != 0
):
if ("result" not in msg or "code" not in msg["result"] or
msg["result"]["code"] != 0):
_LOGGER.error("unsubscribe device error, %s, %s", self.did, msg)
return
_LOGGER.info("unsubscribe success, %s, %s", self._if_name, self.did)
@ -374,11 +372,8 @@ class _MIoTLanDevice:
self.__update_keep_alive,
_MIoTLanDeviceState.PING1,
)
case (
_MIoTLanDeviceState.PING1
| _MIoTLanDeviceState.PING2
| _MIoTLanDeviceState.PING3
):
case (_MIoTLanDeviceState.PING1 | _MIoTLanDeviceState.PING2 |
_MIoTLanDeviceState.PING3):
# Set the timer first to avoid Any early returns
self._ka_timer = self._manager.internal_loop.call_later(
self.FAST_PING_INTERVAL,
@ -416,16 +411,16 @@ class _MIoTLanDevice:
if not online:
self.online = False
else:
if len(self._online_offline_history) < self.NETWORK_UNSTABLE_CNT_TH or (
ts_now - self._online_offline_history[0]["ts"]
> self.NETWORK_UNSTABLE_TIME_TH
):
if len(self._online_offline_history
) < self.NETWORK_UNSTABLE_CNT_TH or (
ts_now - self._online_offline_history[0]["ts"]
> self.NETWORK_UNSTABLE_TIME_TH):
self.online = True
else:
_LOGGER.info("unstable device detected, %s", self.did)
self._online_offline_timer = self._manager.internal_loop.call_later(
self.NETWORK_UNSTABLE_RESUME_TH, self.__online_resume_handler
)
self.NETWORK_UNSTABLE_RESUME_TH,
self.__online_resume_handler)
def __online_resume_handler(self) -> None:
_LOGGER.info("unstable resume threshold past, %s", self.did)
@ -505,21 +500,19 @@ class MIoTLan:
self._net_ifs = set(net_ifs)
self._network = network
self._network.sub_network_info(
key="miot_lan", handler=self.__on_network_info_change_external_async
)
key="miot_lan",
handler=self.__on_network_info_change_external_async)
self._mips_service = mips_service
self._mips_service.sub_service_change(
key="miot_lan", group_id="*", handler=self.__on_mips_service_change
)
key="miot_lan", group_id="*", handler=self.__on_mips_service_change)
self._enable_subscribe = enable_subscribe
self._virtual_did = (
str(virtual_did) if (virtual_did is not None) else str(secrets.randbits(64))
)
self._virtual_did = (str(virtual_did) if
(virtual_did is not None) else str(
secrets.randbits(64)))
# Init socket probe message
probe_bytes = bytearray(self.OT_PROBE_LEN)
probe_bytes[:20] = (
b"!1\x00\x20\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xffMDID"
)
b"!1\x00\x20\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xffMDID")
probe_bytes[20:28] = struct.pack(">Q", int(self._virtual_did))
probe_bytes[28:32] = b"\x00\x00\x00\x00"
self._probe_msg = bytes(probe_bytes)
@ -544,17 +537,16 @@ class MIoTLan:
self._init_lock = asyncio.Lock()
self._init_done = False
if len(self._mips_service.get_services()) == 0 and len(self._net_ifs) > 0:
if len(self._mips_service.get_services()) == 0 and len(
self._net_ifs) > 0:
_LOGGER.info("no central hub gateway service, init miot lan")
self._main_loop.call_later(
0, lambda: self._main_loop.create_task(self.init_async())
)
0, lambda: self._main_loop.create_task(self.init_async()))
def __assert_service_ready(self) -> None:
if not self._init_done:
raise MIoTLanError(
"MIoT lan is not ready", MIoTErrorCode.CODE_LAN_UNAVAILABLE
)
raise MIoTLanError("MIoT lan is not ready",
MIoTErrorCode.CODE_LAN_UNAVAILABLE)
@property
def virtual_did(self) -> str:
@ -593,8 +585,8 @@ class MIoTLan:
return
try:
self._profile_models = await self._main_loop.run_in_executor(
None, load_yaml_file, gen_absolute_path(self.PROFILE_MODELS_FILE)
)
None, load_yaml_file,
gen_absolute_path(self.PROFILE_MODELS_FILE))
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error("load profile models error, %s", err)
self._profile_models = {}
@ -607,16 +599,14 @@ class MIoTLan:
self._init_done = True
for handler in list(self._lan_state_sub_map.values()):
self._main_loop.create_task(handler(True))
_LOGGER.info(
"miot lan init, %s ,%s", self._net_ifs, self._available_net_ifs
)
_LOGGER.info("miot lan init, %s ,%s", self._net_ifs,
self._available_net_ifs)
def __internal_loop_thread(self) -> None:
_LOGGER.info("miot lan thread start")
self.__init_socket()
self._scan_timer = self._internal_loop.call_later(
int(3 * random.random()), self.__scan_devices
)
int(3 * random.random()), self.__scan_devices)
self._internal_loop.run_forever()
_LOGGER.info("miot lan thread exit")
@ -683,8 +673,8 @@ class MIoTLan:
self._enable_subscribe = enable_subscribe
return
self._internal_loop.call_soon_threadsafe(
self.__update_subscribe_option, {"enable_subscribe": enable_subscribe}
)
self.__update_subscribe_option,
{"enable_subscribe": enable_subscribe})
def update_devices(self, devices: dict[str, dict]) -> bool:
_LOGGER.info("update devices, %s", devices)
@ -700,7 +690,8 @@ class MIoTLan:
self._internal_loop.call_soon_threadsafe(self.__delete_devices, devices)
return True
def sub_lan_state(self, key: str, handler: Callable[[bool], Coroutine]) -> None:
def sub_lan_state(self, key: str, handler: Callable[[bool],
Coroutine]) -> None:
self._lan_state_sub_map[key] = handler
def unsub_lan_state(self, key: str) -> None:
@ -717,7 +708,9 @@ class MIoTLan:
return False
self._internal_loop.call_soon_threadsafe(
self.__sub_device_state,
_MIoTLanSubDeviceData(key=key, handler=handler, handler_ctx=handler_ctx),
_MIoTLanSubDeviceData(key=key,
handler=handler,
handler_ctx=handler_ctx),
)
return True
@ -726,8 +719,7 @@ class MIoTLan:
if not self._init_done:
return False
self._internal_loop.call_soon_threadsafe(
self.__unsub_device_state, _MIoTLanUnsubDeviceData(key=key)
)
self.__unsub_device_state, _MIoTLanUnsubDeviceData(key=key))
return True
@final
@ -746,24 +738,24 @@ class MIoTLan:
key = f"{did}/p/{'#' if siid is None or piid is None else f'{siid}/{piid}'}"
self._internal_loop.call_soon_threadsafe(
self.__sub_broadcast,
_MIoTLanRegisterBroadcastData(
key=key, handler=handler, handler_ctx=handler_ctx
),
_MIoTLanRegisterBroadcastData(key=key,
handler=handler,
handler_ctx=handler_ctx),
)
return True
@final
def unsub_prop(
self, did: str, siid: Optional[int] = None, piid: Optional[int] = None
) -> bool:
def unsub_prop(self,
did: str,
siid: Optional[int] = None,
piid: Optional[int] = None) -> bool:
if not self._init_done:
return False
if not self._enable_subscribe:
return False
key = f"{did}/p/{'#' if siid is None or piid is None else f'{siid}/{piid}'}"
self._internal_loop.call_soon_threadsafe(
self.__unsub_broadcast, _MIoTLanUnregisterBroadcastData(key=key)
)
self.__unsub_broadcast, _MIoTLanUnregisterBroadcastData(key=key))
return True
@final
@ -782,80 +774,90 @@ class MIoTLan:
key = f"{did}/e/{'#' if siid is None or eiid is None else f'{siid}/{eiid}'}"
self._internal_loop.call_soon_threadsafe(
self.__sub_broadcast,
_MIoTLanRegisterBroadcastData(
key=key, handler=handler, handler_ctx=handler_ctx
),
_MIoTLanRegisterBroadcastData(key=key,
handler=handler,
handler_ctx=handler_ctx),
)
return True
@final
def unsub_event(
self, did: str, siid: Optional[int] = None, eiid: Optional[int] = None
) -> bool:
def unsub_event(self,
did: str,
siid: Optional[int] = None,
eiid: Optional[int] = None) -> bool:
if not self._init_done:
return False
if not self._enable_subscribe:
return False
key = f"{did}/e/{'#' if siid is None or eiid is None else f'{siid}/{eiid}'}"
self._internal_loop.call_soon_threadsafe(
self.__unsub_broadcast, _MIoTLanUnregisterBroadcastData(key=key)
)
self.__unsub_broadcast, _MIoTLanUnregisterBroadcastData(key=key))
return True
@final
async def get_prop_async(
self, did: str, siid: int, piid: int, timeout_ms: int = 10000
) -> Any:
async def get_prop_async(self,
did: str,
siid: int,
piid: int,
timeout_ms: int = 10000) -> Any:
self.__assert_service_ready()
result_obj = await self.__call_api_async(
did=did,
msg={
"method": "get_properties",
"params": [{"did": did, "siid": siid, "piid": piid}],
"params": [{
"did": did,
"siid": siid,
"piid": piid
}],
},
timeout_ms=timeout_ms,
)
if (
result_obj
and "result" in result_obj
and len(result_obj["result"]) == 1
and "did" in result_obj["result"][0]
and result_obj["result"][0]["did"] == did
):
if (result_obj and "result" in result_obj and
len(result_obj["result"]) == 1 and
"did" in result_obj["result"][0] and
result_obj["result"][0]["did"] == did):
return result_obj["result"][0].get("value", None)
return None
@final
async def set_prop_async(
self, did: str, siid: int, piid: int, value: Any, timeout_ms: int = 10000
) -> dict:
async def set_prop_async(self,
did: str,
siid: int,
piid: int,
value: Any,
timeout_ms: int = 10000) -> dict:
self.__assert_service_ready()
result_obj = await self.__call_api_async(
did=did,
msg={
"method": "set_properties",
"params": [{"did": did, "siid": siid, "piid": piid, "value": value}],
"method":
"set_properties",
"params": [{
"did": did,
"siid": siid,
"piid": piid,
"value": value
}],
},
timeout_ms=timeout_ms,
)
if result_obj:
if (
"result" in result_obj
and len(result_obj["result"]) == 1
and "did" in result_obj["result"][0]
and result_obj["result"][0]["did"] == did
and "code" in result_obj["result"][0]
):
if ("result" in result_obj and len(result_obj["result"]) == 1 and
"did" in result_obj["result"][0] and
result_obj["result"][0]["did"] == did and
"code" in result_obj["result"][0]):
return result_obj["result"][0]
if "code" in result_obj:
return result_obj
raise MIoTError("Invalid result", MIoTErrorCode.CODE_INTERNAL_ERROR)
@final
async def set_props_async(
self, did: str, props_list: List[Dict[str, Any]], timeout_ms: int = 10000
) -> dict:
async def set_props_async(self,
did: str,
props_list: List[Dict[str, Any]],
timeout_ms: int = 10000) -> dict:
self.__assert_service_ready()
result_obj = await self.__call_api_async(
did=did,
@ -866,12 +868,10 @@ class MIoTLan:
timeout_ms=timeout_ms,
)
if result_obj:
if (
"result" in result_obj
and len(result_obj["result"]) == len(props_list)
and result_obj["result"][0].get("did") == did
and all("code" in item for item in result_obj["result"])
):
if ("result" in result_obj and
len(result_obj["result"]) == len(props_list) and
result_obj["result"][0].get("did") == did and
all("code" in item for item in result_obj["result"])):
return result_obj["result"]
if "error" in result_obj:
return result_obj["error"]
@ -881,15 +881,23 @@ class MIoTLan:
}
@final
async def action_async(
self, did: str, siid: int, aiid: int, in_list: list, timeout_ms: int = 10000
) -> dict:
async def action_async(self,
did: str,
siid: int,
aiid: int,
in_list: list,
timeout_ms: int = 10000) -> dict:
self.__assert_service_ready()
result_obj = await self.__call_api_async(
did=did,
msg={
"method": "action",
"params": {"did": did, "siid": siid, "aiid": aiid, "in": in_list},
"params": {
"did": did,
"siid": siid,
"aiid": aiid,
"in": in_list
},
},
timeout_ms=timeout_ms,
)
@ -901,7 +909,8 @@ class MIoTLan:
raise MIoTError("Invalid result", MIoTErrorCode.CODE_INTERNAL_ERROR)
@final
async def get_dev_list_async(self, timeout_ms: int = 10000) -> dict[str, dict]:
async def get_dev_list_async(self,
timeout_ms: int = 10000) -> dict[str, dict]:
if not self._init_done:
return {}
@ -911,28 +920,30 @@ class MIoTLan:
fut: asyncio.Future = self._main_loop.create_future()
self._internal_loop.call_soon_threadsafe(
self.__get_dev_list,
_MIoTLanGetDevListData(
handler=get_device_list_handler, handler_ctx=fut, timeout_ms=timeout_ms
),
_MIoTLanGetDevListData(handler=get_device_list_handler,
handler_ctx=fut,
timeout_ms=timeout_ms),
)
return await fut
async def __call_api_async(
self, did: str, msg: dict, timeout_ms: int = 10000
) -> dict:
async def __call_api_async(self,
did: str,
msg: dict,
timeout_ms: int = 10000) -> dict:
def call_api_handler(msg: dict, fut: asyncio.Future):
self._main_loop.call_soon_threadsafe(fut.set_result, msg)
fut: asyncio.Future = self._main_loop.create_future()
self._internal_loop.call_soon_threadsafe(
self.__call_api, did, msg, call_api_handler, fut, timeout_ms
)
self._internal_loop.call_soon_threadsafe(self.__call_api, did, msg,
call_api_handler, fut,
timeout_ms)
return await fut
async def __on_network_info_change_external_async(
self, status: InterfaceStatus, info: NetworkInfo
) -> None:
_LOGGER.info("on network info change, status: %s, info: %s", status, info)
self, status: InterfaceStatus, info: NetworkInfo) -> None:
_LOGGER.info("on network info change, status: %s, info: %s", status,
info)
available_net_ifs = set()
for if_name in list(self._network.network_info.keys()):
available_net_ifs.add(if_name)
@ -954,10 +965,11 @@ class MIoTLan:
_MIoTLanNetworkUpdateData(status=status, if_name=info.name),
)
async def __on_mips_service_change(
self, group_id: str, state: MipsServiceState, data: dict
) -> None:
_LOGGER.info("on mips service change, %s, %s, %s", group_id, state, data)
async def __on_mips_service_change(self, group_id: str,
state: MipsServiceState,
data: dict) -> None:
_LOGGER.info("on mips service change, %s, %s, %s", group_id, state,
data)
if len(self._mips_service.get_services()) > 0:
_LOGGER.info("find central service, deinit miot lan")
await self.deinit_async()
@ -970,9 +982,10 @@ class MIoTLan:
def ping(self, if_name: Optional[str], target_ip: str) -> None:
if not target_ip:
return
self.__sendto(
if_name=if_name, data=self._probe_msg, address=target_ip, port=self.OT_PORT
)
self.__sendto(if_name=if_name,
data=self._probe_msg,
address=target_ip,
port=self.OT_PORT)
def send2device(
self,
@ -1021,22 +1034,27 @@ class MIoTLan:
handler_ctx: Any = None,
timeout_ms: Optional[int] = None,
) -> None:
def request_timeout_handler(req_data: _MIoTLanRequestData):
self._pending_requests.pop(req_data.msg_id, None)
if req_data and req_data.handler:
req_data.handler(
{"code": MIoTErrorCode.CODE_TIMEOUT.value, "error": "timeout"},
{
"code": MIoTErrorCode.CODE_TIMEOUT.value,
"error": "timeout"
},
req_data.handler_ctx,
)
timer: Optional[asyncio.TimerHandle] = None
request_data = _MIoTLanRequestData(
msg_id=msg_id, handler=handler, handler_ctx=handler_ctx, timeout=timer
)
request_data = _MIoTLanRequestData(msg_id=msg_id,
handler=handler,
handler_ctx=handler_ctx,
timeout=timer)
if timeout_ms:
timer = self._internal_loop.call_later(
timeout_ms / 1000, request_timeout_handler, request_data
)
timer = self._internal_loop.call_later(timeout_ms / 1000,
request_timeout_handler,
request_data)
request_data.timeout = timer
self._pending_requests[msg_id] = request_data
self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT)
@ -1067,7 +1085,10 @@ class MIoTLan:
try:
self.send2device(
did=did,
msg={"from": "ha.xiaomi_home", **msg},
msg={
"from": "ha.xiaomi_home",
**msg
},
handler=handler,
handler_ctx=handler_ctx,
timeout_ms=timeout_ms,
@ -1075,7 +1096,10 @@ class MIoTLan:
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error("send2device error, %s", err)
handler(
{"code": MIoTErrorCode.CODE_INTERNAL_ERROR.value, "error": str(err)},
{
"code": MIoTErrorCode.CODE_INTERNAL_ERROR.value,
"error": str(err)
},
handler_ctx,
)
@ -1096,9 +1120,10 @@ class MIoTLan:
def __get_dev_list(self, data: _MIoTLanGetDevListData) -> None:
dev_list = {
device.did: {"online": device.online, "push_available": device.subscribed}
for device in self._lan_devices.values()
if device.online
device.did: {
"online": device.online,
"push_available": device.subscribed
} for device in self._lan_devices.values() if device.online
}
data.handler(dev_list, data.handler_ctx)
@ -1111,9 +1136,8 @@ class MIoTLan:
if "model" not in info or info["model"] in self._profile_models:
# Do not support the local control of
# Profile device for the time being
_LOGGER.info(
"model not support local ctrl, %s, %s", did, info.get("model")
)
_LOGGER.info("model not support local ctrl, %s, %s", did,
info.get("model"))
continue
if did not in self._lan_devices:
if "token" not in info:
@ -1122,9 +1146,10 @@ class MIoTLan:
if len(info["token"]) != 32:
_LOGGER.error("invalid device token, %s, %s", did, info)
continue
self._lan_devices[did] = _MIoTLanDevice(
manager=self, did=did, token=info["token"], ip=info.get("ip", None)
)
self._lan_devices[did] = _MIoTLanDevice(manager=self,
did=did,
token=info["token"],
ip=info.get("ip", None))
else:
self._lan_devices[did].update_info(info)
@ -1195,15 +1220,17 @@ class MIoTLan:
return
# Create socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Set SO_BINDTODEVICE
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode())
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE,
if_name.encode())
sock.bind(("", self._local_port or 0))
self._internal_loop.add_reader(
sock.fileno(), self.__socket_read_handler, (if_name, sock)
)
self._internal_loop.add_reader(sock.fileno(),
self.__socket_read_handler,
(if_name, sock))
self._broadcast_socks[if_name] = sock
self._local_port = self._local_port or sock.getsockname()[1]
_LOGGER.info("created socket, %s, %s", if_name, self._local_port)
@ -1225,9 +1252,9 @@ class MIoTLan:
def __socket_read_handler(self, ctx: tuple[str, socket.socket]) -> None:
try:
data_len, addr = ctx[1].recvfrom_into(
self._read_buffer, self.OT_MSG_LEN, socket.MSG_DONTWAIT
)
data_len, addr = ctx[1].recvfrom_into(self._read_buffer,
self.OT_MSG_LEN,
socket.MSG_DONTWAIT)
if data_len < 0:
# Socket error
_LOGGER.error("socket read error, %s, %s", ctx[0], data_len)
@ -1235,15 +1262,13 @@ class MIoTLan:
if addr[1] != self.OT_PORT:
# Not ot msg
return
self.__raw_message_handler(
self._read_buffer[:data_len], data_len, addr[0], ctx[0]
)
self.__raw_message_handler(self._read_buffer[:data_len], data_len,
addr[0], ctx[0])
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.error("socket read handler error, %s", err)
def __raw_message_handler(
self, data: bytearray, data_len: int, ip: str, if_name: str
) -> None:
def __raw_message_handler(self, data: bytearray, data_len: int, ip: str,
if_name: str) -> None:
if data[:2] != self.OT_HEADER:
return
# Keep alive message
@ -1257,22 +1282,14 @@ class MIoTLan:
if data_len == self.OT_PROBE_LEN or device.subscribed:
device.keep_alive(ip=ip, if_name=if_name)
# Manage device subscribe status
if (
self._enable_subscribe
and data_len == self.OT_PROBE_LEN
and data[16:20] == b"MSUB"
and data[24:27] == b"PUB"
):
device.supported_wildcard_sub = (
int(data[28]) == self.OT_SUPPORT_WILDCARD_SUB
)
if (self._enable_subscribe and data_len == self.OT_PROBE_LEN and
data[16:20] == b"MSUB" and data[24:27] == b"PUB"):
device.supported_wildcard_sub = (int(
data[28]) == self.OT_SUPPORT_WILDCARD_SUB)
sub_ts = struct.unpack(">I", data[20:24])[0]
sub_type = int(data[27])
if (
device.supported_wildcard_sub
and sub_type in [0, 1, 4]
and sub_ts != device.sub_ts
):
if (device.supported_wildcard_sub and sub_type in [0, 1, 4] and
sub_ts != device.sub_ts):
device.subscribed = False
device.subscribe()
if data_len > self.OT_PROBE_LEN:
@ -1289,49 +1306,52 @@ class MIoTLan:
_LOGGER.warning("invalid message, no id, %s, %s", did, msg)
return
# Reply
req: Optional[_MIoTLanRequestData] = self._pending_requests.pop(msg["id"], None)
req: Optional[_MIoTLanRequestData] = self._pending_requests.pop(
msg["id"], None)
if req:
if req.timeout:
req.timeout.cancel()
req.timeout = None
if req.handler is not None:
self._main_loop.call_soon_threadsafe(req.handler, msg, req.handler_ctx)
self._main_loop.call_soon_threadsafe(req.handler, msg,
req.handler_ctx)
return
# Handle up link message
if "method" not in msg or "params" not in msg:
_LOGGER.debug("invalid message, no method or params, %s, %s", did, msg)
_LOGGER.debug("invalid message, no method or params, %s, %s", did,
msg)
return
# Filter dup message
if self.__filter_dup_message(did, msg["id"]):
self.send2device(did=did, msg={"id": msg["id"], "result": {"code": 0}})
self.send2device(did=did,
msg={
"id": msg["id"],
"result": {
"code": 0
}
})
return
_LOGGER.debug("lan message, %s, %s", did, msg)
if msg["method"] == "properties_changed":
for param in msg["params"]:
if "siid" not in param and "piid" not in param:
_LOGGER.debug("invalid message, no siid or piid, %s, %s", did, msg)
_LOGGER.debug("invalid message, no siid or piid, %s, %s",
did, msg)
continue
key = f"{did}/p/{param['siid']}/{param['piid']}"
subs: list[_MIoTLanRegisterBroadcastData] = list(
self._device_msg_matcher.iter_match(key)
)
self._device_msg_matcher.iter_match(key))
for sub in subs:
self._main_loop.call_soon_threadsafe(
sub.handler, param, sub.handler_ctx
)
elif (
msg["method"] == "event_occured"
and "siid" in msg["params"]
and "eiid" in msg["params"]
):
sub.handler, param, sub.handler_ctx)
elif (msg["method"] == "event_occured" and "siid" in msg["params"] and
"eiid" in msg["params"]):
key = f"{did}/e/{msg['params']['siid']}/{msg['params']['eiid']}"
subs: list[_MIoTLanRegisterBroadcastData] = list(
self._device_msg_matcher.iter_match(key)
)
self._device_msg_matcher.iter_match(key))
for sub in subs:
self._main_loop.call_soon_threadsafe(
sub.handler, msg["params"], sub.handler_ctx
)
self._main_loop.call_soon_threadsafe(sub.handler, msg["params"],
sub.handler_ctx)
else:
_LOGGER.debug("invalid message, unknown method, %s, %s", did, msg)
# Reply
@ -1342,13 +1362,12 @@ class MIoTLan:
if filter_id in self._reply_msg_buffer:
return True
self._reply_msg_buffer[filter_id] = self._internal_loop.call_later(
5, lambda filter_id: self._reply_msg_buffer.pop(filter_id, None), filter_id
)
5, lambda filter_id: self._reply_msg_buffer.pop(filter_id, None),
filter_id)
return False
def __sendto(
self, if_name: Optional[str], data: bytes, address: str, port: int
) -> None:
def __sendto(self, if_name: Optional[str], data: bytes, address: str,
port: int) -> None:
if if_name is None:
# Broadcast
for if_n, sock in self._broadcast_socks.items():
@ -1375,14 +1394,12 @@ class MIoTLan:
pass
scan_time = self.__get_next_scan_time()
self._scan_timer = self._internal_loop.call_later(
scan_time, self.__scan_devices
)
scan_time, self.__scan_devices)
_LOGGER.debug("next scan time: %ss", scan_time)
def __get_next_scan_time(self) -> float:
if not self._last_scan_interval:
self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN
self._last_scan_interval = min(
self._last_scan_interval * 2, self.OT_PROBE_INTERVAL_MAX
)
self._last_scan_interval = min(self._last_scan_interval * 2,
self.OT_PROBE_INTERVAL_MAX)
return self._last_scan_interval

File diff suppressed because it is too large Load Diff

View File

@ -70,7 +70,8 @@ async def async_setup_entry(
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up a config entry."""
device_list: list[MIoTDevice] = hass.data[DOMAIN]["devices"][config_entry.entry_id]
device_list: list[MIoTDevice] = hass.data[DOMAIN]["devices"][
config_entry.entry_id]
new_entities = []
for miot_device in device_list:
@ -85,13 +86,13 @@ async def async_setup_entry(
for miot_device in device_list:
if "device:light" in miot_device.spec_instance.urn:
if miot_device.entity_list.get("light", []):
device_id = list(miot_device.device_info.get("identifiers"))[0][1]
device_id = list(
miot_device.device_info.get("identifiers"))[0][1]
light_entity_id = miot_device.gen_device_entity_id(DOMAIN)
new_light_select_entities.append(
LightCommandSendMode(
hass=hass, light_entity_id=light_entity_id, device_id=device_id
)
)
LightCommandSendMode(hass=hass,
light_entity_id=light_entity_id,
device_id=device_id))
if new_light_select_entities:
async_add_entities(new_light_select_entities)
@ -108,7 +109,8 @@ class Select(MIoTPropertyEntity, SelectEntity):
async def async_select_option(self, option: str) -> None:
"""Change the selected option."""
await self.set_property_async(value=self.get_vlist_value(description=option))
await self.set_property_async(value=self.get_vlist_value(
description=option))
@property
def current_option(self) -> Optional[str]:
@ -121,18 +123,20 @@ class LightCommandSendMode(SelectEntity, RestoreEntity):
then send other color temperatures and brightness or send them all at the same time.
The default is to send one by one."""
def __init__(self, hass: HomeAssistant, light_entity_id: str, device_id: str):
def __init__(self, hass: HomeAssistant, light_entity_id: str,
device_id: str):
super().__init__()
self.hass = hass
self._device_id = device_id
self._attr_name = "Command Send Mode"
self._attr_unique_id = f"{light_entity_id}_command_send_mode"
self._attr_options = ["Send One by One", "Send Turn On First", "Send Together"]
self._attr_options = [
"Send One by One", "Send Turn On First", "Send Together"
]
self._attr_device_info = {"identifiers": {(DOMAIN, device_id)}}
self._attr_current_option = self._attr_options[0] # 默认选项
self._attr_entity_category = (
EntityCategory.CONFIG
) # **重点:告诉 HA 这是配置类实体**
self._attr_entity_category = (EntityCategory.CONFIG
) # **重点:告诉 HA 这是配置类实体**
async def async_select_option(self, option: str):
"""处理用户选择的选项。"""
@ -143,9 +147,8 @@ class LightCommandSendMode(SelectEntity, RestoreEntity):
async def async_added_to_hass(self):
"""在实体添加到 Home Assistant 时恢复上次的状态。"""
await super().async_added_to_hass()
if (
last_state := await self.async_get_last_state()
) and last_state.state in self._attr_options:
if (last_state := await self.async_get_last_state()
) and last_state.state in self._attr_options:
self._attr_current_option = last_state.state
@property