♻️ 重构 storage_id_to_path() 逻辑

- 更严谨的 storage_id 格式检查
- 改进错误处理,增加对 QuarkTV 驱动的提示
This commit is contained in:
Cp0204 2024-11-27 18:55:54 +08:00
parent 749d1b7039
commit 0a47d48c60
2 changed files with 107 additions and 83 deletions

View File

@ -62,13 +62,16 @@ class Alist:
return False return False
def storage_id_to_path(self, storage_id): def storage_id_to_path(self, storage_id):
storage_mount_path, quark_root_dir = None, None
# 1. 检查是否符合 /aaa:/bbb 格式 # 1. 检查是否符合 /aaa:/bbb 格式
match = re.match(r"^(\/[^:]*):(\/[^:]*)$", storage_id) if match := re.match(r"^(\/[^:]*):(\/[^:]*)$", storage_id):
if match: # 存储挂载路径, 夸克根文件夹
return True, (match.group(1), match.group(2)) storage_mount_path, quark_root_dir = match.group(1), match.group(2)
# 2. 调用 Alist API 获取存储信息 if not self.get_file_list(storage_mount_path):
storage_info = self.get_storage_info(storage_id) return False, (None, None)
if storage_info: # 2. 检查是否数字,调用 Alist API 获取存储信息
elif re.match(r"^\d+$", storage_id):
if storage_info := self.get_storage_info(storage_id):
if storage_info["driver"] == "Quark": if storage_info["driver"] == "Quark":
addition = json.loads(storage_info["addition"]) addition = json.loads(storage_info["addition"])
# 存储挂载路径 # 存储挂载路径
@ -77,10 +80,19 @@ class Alist:
quark_root_dir = self.get_root_folder_full_path( quark_root_dir = self.get_root_folder_full_path(
addition["cookie"], addition["root_folder_id"] addition["cookie"], addition["root_folder_id"]
) )
elif storage_info["driver"] == "QuarkTV":
print(
f"Alist刷新: [QuarkTV]驱动⚠️ storage_id请手动填入 /Alist挂载路径:/Quark目录路径"
)
else:
print(f"Alist刷新: 不支持[{storage_info['driver']}]驱动 ❌")
else:
print(f"Alist刷新: storage_id[{storage_id}]格式错误❌")
# 返回结果
if storage_mount_path and quark_root_dir: if storage_mount_path and quark_root_dir:
return True, (storage_mount_path, quark_root_dir) return True, (storage_mount_path, quark_root_dir)
else: else:
print(f"Alist刷新: 不支持[{storage_info['driver']}]驱动 ❌") return False, (None, None)
def get_storage_info(self, storage_id): def get_storage_info(self, storage_id):
url = f"{self.url}/api/admin/storage/get" url = f"{self.url}/api/admin/storage/get"
@ -98,20 +110,8 @@ class Alist:
print(f"Alist刷新: 获取Alist存储出错 {e}") print(f"Alist刷新: 获取Alist存储出错 {e}")
return False return False
def refresh(self, path, force_refresh=True): def refresh(self, path):
url = f"{self.url}/api/fs/list" data = self.get_file_list(path, True)
headers = {"Authorization": self.token}
payload = {
"path": path,
"refresh": force_refresh,
"password": "",
"page": 1,
"per_page": 0,
}
try:
response = requests.request("POST", url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
if data.get("code") == 200: if data.get("code") == 200:
print(f"📁 Alist刷新目录[{path}] 成功✅") print(f"📁 Alist刷新目录[{path}] 成功✅")
return data.get("data") return data.get("data")
@ -127,8 +127,23 @@ class Alist:
return self.refresh(parent_path) return self.refresh(parent_path)
else: else:
print(f"📁 Alist刷新失败❌ {data.get('message')}") print(f"📁 Alist刷新失败❌ {data.get('message')}")
except requests.exceptions.RequestException as e:
print(f"Alist刷新目录出错: {e}") def get_file_list(self, path, force_refresh=False):
url = f"{self.url}/api/fs/list"
headers = {"Authorization": self.token}
payload = {
"path": path,
"refresh": force_refresh,
"password": "",
"page": 1,
"per_page": 0,
}
try:
response = requests.request("POST", url, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"📁 Alist刷新: 获取文件列表出错❌ {e}")
return False return False
def get_root_folder_full_path(self, cookie, pdir_fid): def get_root_folder_full_path(self, cookie, pdir_fid):

View File

@ -72,16 +72,19 @@ class Alist_strm_gen:
task["savepath"].replace(self.quark_root_dir, "", 1).lstrip("/"), task["savepath"].replace(self.quark_root_dir, "", 1).lstrip("/"),
) )
).replace("\\", "/") ).replace("\\", "/")
self.refresh(alist_path) self.check_dir(alist_path)
def storage_id_to_path(self, storage_id): def storage_id_to_path(self, storage_id):
storage_mount_path, quark_root_dir = None, None
# 1. 检查是否符合 /aaa:/bbb 格式 # 1. 检查是否符合 /aaa:/bbb 格式
match = re.match(r"^(\/[^:]*):(\/[^:]*)$", storage_id) if match := re.match(r"^(\/[^:]*):(\/[^:]*)$", storage_id):
if match: # 存储挂载路径, 夸克根文件夹
return True, (match.group(1), match.group(2)) storage_mount_path, quark_root_dir = match.group(1), match.group(2)
# 2. 调用 Alist API 获取存储信息 if not self.get_file_list(storage_mount_path):
storage_info = self.get_storage_info(storage_id) return False, (None, None)
if storage_info: # 2. 检查是否数字,调用 Alist API 获取存储信息
elif re.match(r"^\d+$", storage_id):
if storage_info := self.get_storage_info(storage_id):
if storage_info["driver"] == "Quark": if storage_info["driver"] == "Quark":
addition = json.loads(storage_info["addition"]) addition = json.loads(storage_info["addition"])
# 存储挂载路径 # 存储挂载路径
@ -90,10 +93,19 @@ class Alist_strm_gen:
quark_root_dir = self.get_root_folder_full_path( quark_root_dir = self.get_root_folder_full_path(
addition["cookie"], addition["root_folder_id"] addition["cookie"], addition["root_folder_id"]
) )
elif storage_info["driver"] == "QuarkTV":
print(
f"Alist-Strm生成: [QuarkTV]驱动⚠️ storage_id请手动填入 /Alist挂载路径:/Quark目录路径"
)
else:
print(f"Alist-Strm生成: 不支持[{storage_info['driver']}]驱动 ❌")
else:
print(f"Alist-Strm生成: storage_id[{storage_id}]格式错误❌")
# 返回结果
if storage_mount_path and quark_root_dir: if storage_mount_path and quark_root_dir:
print(f"Alist-Strm生成: [{storage_mount_path}:{quark_root_dir}]")
return True, (storage_mount_path, quark_root_dir) return True, (storage_mount_path, quark_root_dir)
else: else:
print(f"Alist刷新: 不支持[{storage_info['driver']}]驱动 ❌")
return False, (None, None) return False, (None, None)
def get_storage_info(self, storage_id): def get_storage_info(self, storage_id):
@ -105,46 +117,43 @@ class Alist_strm_gen:
response.raise_for_status() response.raise_for_status()
data = response.json() data = response.json()
if data.get("code") == 200: if data.get("code") == 200:
print(
f"Alist-Strm生成: {data['data']['driver']}[{data['data']['mount_path']}]"
)
return data.get("data", []) return data.get("data", [])
else: else:
print(f"Alist-Strm生成: 连接失败❌ {response.get('message')}") print(f"Alist-Strm生成: 获取存储失败❌ {data.get('message')}")
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"Alist-Strm生成: 获取Alist存储出错 {e}") print(f"Alist-Strm生成: 获取存储出错 {e}")
return False return False
def refresh(self, path): def check_dir(self, path):
try: data = self.get_file_list(path)
response = self.get_file_list(path) if data.get("code") != 200:
if response.get("code") != 200: print(f"📺 Alist-Strm生成: 获取文件列表失败❌{data.get('message')}")
print(f"📺 生成 STRM 文件失败❌ {response.get('message')}")
return return
else: elif files := data.get("data", {}).get("content"):
files = response.get("data").get("content")
for item in files: for item in files:
item_path = f"{path}/{item.get('name')}".replace("//", "/") item_path = f"{path}/{item.get('name')}".replace("//", "/")
if item.get("is_dir"): if item.get("is_dir"):
self.refresh(item_path) self.check_dir(item_path)
else: else:
self.generate_strm(item_path) self.generate_strm(item_path)
except Exception as e:
print(f"📺 获取 Alist 文件列表失败❌ {e}")
def get_file_list(self, path): def get_file_list(self, path, force_refresh=False):
url = f"{self.url}/api/fs/list" url = f"{self.url}/api/fs/list"
headers = {"Authorization": self.token} headers = {"Authorization": self.token}
payload = { payload = {
"path": path, "path": path,
"refresh": False, "refresh": force_refresh,
"password": "", "password": "",
"page": 1, "page": 1,
"per_page": 0, "per_page": 0,
} }
try:
response = requests.request("POST", url, headers=headers, json=payload) response = requests.request("POST", url, headers=headers, json=payload)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
except Exception as e:
print(f"📺 Alist-Strm生成: 获取文件列表出错❌ {e}")
return False
def generate_strm(self, file_path): def generate_strm(self, file_path):
ext = file_path.split(".")[-1] ext = file_path.split(".")[-1]