修复 AList 插件无法刷新目录的问题

This commit is contained in:
x1ao4 2025-05-21 19:08:20 +08:00
parent dbc469fe3f
commit 18f344e19c
2 changed files with 264 additions and 63 deletions

View File

@ -2,6 +2,7 @@ import os
import re
import json
import requests
import time
class Alist:
@ -17,32 +18,82 @@ class Alist:
quark_root_dir = None
def __init__(self, **kwargs):
"""初始化AList插件"""
# 标记插件名称,便于日志识别
self.plugin_name = self.__class__.__name__.lower()
if kwargs:
# 加载配置
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.url and self.token:
if self.get_info():
success, result = self.storage_id_to_path(self.storage_id)
if success:
self.storage_mount_path, self.quark_root_dir = result
self.is_active = True
print(f"{self.plugin_name} 模块缺少必要参数: {key}")
# 检查基本配置
if not self.url or not self.token or not self.storage_id:
return
# 确保URL格式正确
if not self.url.startswith(("http://", "https://")):
self.url = f"http://{self.url}"
# 移除URL末尾的斜杠
self.url = self.url.rstrip("/")
# 验证AList连接
if self.get_info():
# 解析存储ID
success, result = self.storage_id_to_path(self.storage_id)
if success:
self.storage_mount_path, self.quark_root_dir = result
# 确保路径格式正确
if self.quark_root_dir != "/":
if not self.quark_root_dir.startswith("/"):
self.quark_root_dir = f"/{self.quark_root_dir}"
self.quark_root_dir = self.quark_root_dir.rstrip("/")
if not self.storage_mount_path.startswith("/"):
self.storage_mount_path = f"/{self.storage_mount_path}"
self.storage_mount_path = self.storage_mount_path.rstrip("/")
self.is_active = True
else:
print(f"AList 刷新: 存储信息解析失败")
else:
print(f"AList 刷新: 服务器连接失败")
def run(self, task, **kwargs):
if task.get("savepath") and task.get("savepath").startswith(
self.quark_root_dir
):
alist_path = os.path.normpath(
os.path.join(
self.storage_mount_path,
task["savepath"].replace(self.quark_root_dir, "", 1).lstrip("/"),
)
).replace("\\", "/")
self.refresh(alist_path)
"""
插件主入口当有新文件保存时触发刷新AList目录
Args:
task: 任务信息包含savepath等关键信息
**kwargs: 其他参数包括tree和rename_logs
Returns:
task: 返回原任务信息
"""
# 检查路径是否在夸克根目录内
if task.get("savepath"):
# 确保路径符合要求
quark_path = task.get("savepath", "")
if not quark_path.startswith("/"):
quark_path = f"/{quark_path}"
# 检查路径是否在夸克根目录下,或夸克根目录是否为根目录
if self.quark_root_dir == "/" or quark_path.startswith(self.quark_root_dir):
# 映射到AList路径
alist_path = self.map_quark_to_alist_path(quark_path)
# 执行刷新
self.refresh(alist_path)
return task
def get_info(self):
"""获取AList服务器信息"""
url = f"{self.url}/api/admin/setting/list"
headers = {"Authorization": self.token}
querystring = {"group": "1"}
@ -51,26 +102,35 @@ class Alist:
response.raise_for_status()
response = response.json()
if response.get("code") == 200:
print(
f"AList 刷新: {response.get('data',[])[1].get('value','')} {response.get('data',[])[0].get('value','')}"
)
print(f"AList 刷新: {response.get('data',[])[1].get('value','')} {response.get('data',[])[0].get('value','')}")
return True
else:
print(f"AList 刷新: 连接失败 ❌ {response.get('message')}")
except requests.exceptions.RequestException as e:
print(f"获取 AList 信息出错: {e}")
except Exception as e:
print(f"AList 刷新: 连接出错 ❌ {str(e)}")
return False
def storage_id_to_path(self, storage_id):
"""
将存储ID转换为挂载路径和夸克根目录
Args:
storage_id: 存储ID
Returns:
tuple: (成功状态, (挂载路径, 夸克根目录))
"""
storage_mount_path, quark_root_dir = None, None
# 1. 检查是否符合 /aaa:/bbb 格式
if match := re.match(r"^(\/[^:]*):(\/[^:]*)$", storage_id):
# 存储挂载路径, 夸克根文件夹
storage_mount_path, quark_root_dir = match.group(1), match.group(2)
file_list = self.get_file_list(storage_mount_path)
if file_list.get("code") != 200:
print(f"AList 刷新: 获取挂载路径失败 ❌ {file_list.get('message')}")
print(f"AList 刷新: 挂载路径无效")
return False, (None, None)
# 2. 检查是否数字,调用 Alist API 获取存储信息
elif re.match(r"^\d+$", storage_id):
if storage_info := self.get_storage_info(storage_id):
@ -82,14 +142,15 @@ class Alist:
quark_root_dir = self.get_root_folder_full_path(
addition["cookie"], addition["root_folder_id"]
)
elif storage_info["driver"] == "QuarkTV":
print(
f"AList 刷新: [QuarkTV] 驱动 ⚠️ storage_id 请手动填入 /Alist挂载路径:/Quark目录路径"
)
else:
print(f"AList 刷新: 不支持 [{storage_info['driver']}] 驱动 ❌")
print(f"AList 刷新: 不支持 [{storage_info['driver']}] 驱动")
else:
print(f"AList 刷新: 获取存储信息失败")
return False, (None, None)
else:
print(f"AList 刷新: storage_id [{storage_id}] 格式错误 ❌")
print(f"AList 刷新: storage_id 格式错误")
return False, (None, None)
# 返回结果
if storage_mount_path and quark_root_dir:
return True, (storage_mount_path, quark_root_dir)
@ -97,6 +158,7 @@ class Alist:
return False, (None, None)
def get_storage_info(self, storage_id):
"""获取AList存储详细信息"""
url = f"{self.url}/api/admin/storage/get"
headers = {"Authorization": self.token}
querystring = {"id": storage_id}
@ -105,34 +167,119 @@ class Alist:
response.raise_for_status()
data = response.json()
if data.get("code") == 200:
return data.get("data", [])
return data.get("data", {})
else:
print(f"AList 刷新: 存储 {storage_id} 连接失败 ❌ {data.get('message')}")
print(f"AList 刷新: 获取存储信息失败 ({data.get('message', '未知错误')})")
except Exception as e:
print(f"AList 刷新: 获取 AList 存储出错 {e}")
return []
print(f"AList 刷新: 获取存储信息出错 ({str(e)})")
return None
def refresh(self, path):
data = self.get_file_list(path, True)
if data.get("code") == 200:
print(f"📁 AList 刷新: 目录 [{path}] 成功 ✅")
return data.get("data")
elif "object not found" in data.get("message", ""):
# 如果是根目录就不再往上查找
if path == "/" or path == self.storage_mount_path:
print(f"📁 AList 刷新: 根目录不存在,请检查 AList 配置")
return False
# 获取父目录
parent_path = os.path.dirname(path)
print(f"📁 AList 刷新: [{path}] 不存在,转父目录 [{parent_path}]")
# 递归刷新父目录
return self.refresh(parent_path)
def refresh(self, path, retry_count=2):
"""
刷新AList目录支持重试和自动回溯到父目录
Args:
path: 需要刷新的路径
retry_count: 重试次数默认重试2次
"""
# 实现重试机制
for attempt in range(retry_count + 1):
if attempt > 0:
# 不输出重试信息
pass
data = self.get_file_list(path, True)
if data.get("code") == 200:
print(f"📁 刷新 AList 目录: [{path}] 成功 ✅")
return data.get("data")
elif "object not found" in data.get("message", ""):
# 如果是根目录就不再往上查找
if path == "/" or path == self.storage_mount_path:
print(f"📁 AList 刷新: 根目录不存在,请检查配置")
return False
# 自动获取父目录并尝试刷新
parent_path = os.path.dirname(path)
# 先刷新父目录
parent_result = self.get_file_list(parent_path, True)
if parent_result.get("code") == 200:
# 再次尝试刷新原目录
retry_data = self.get_file_list(path, True)
if retry_data.get("code") == 200:
print(f"📁 刷新 AList 目录: [{path}] 成功 ✅")
return retry_data.get("data")
# 如果刷新父目录后仍不成功,则递归处理父目录
return self.refresh(parent_path, retry_count)
elif attempt < retry_count:
# 如果还有重试次数,等待后继续
time.sleep(1) # 等待1秒后重试
else:
# 已达到最大重试次数
error_msg = data.get("message", "未知错误")
print(f"📁 AList 刷新: 失败 ❌ {error_msg}")
return None
def map_quark_to_alist_path(self, quark_path):
"""
将夸克路径映射到AList路径
Args:
quark_path: 夸克路径例如 /Movies/2024
Returns:
str: 对应的AList路径例如 /movies/2024
"""
# 确保路径格式正确
if not quark_path.startswith("/"):
quark_path = f"/{quark_path}"
# 特殊处理根目录的情况
if self.quark_root_dir == "/":
# 夸克根目录是/直接映射到AList挂载路径
if quark_path == "/":
return self.storage_mount_path
else:
# 组合路径
alist_path = os.path.normpath(
os.path.join(self.storage_mount_path, quark_path.lstrip("/"))
).replace("\\", "/")
return alist_path
# 常规情况:检查路径是否在夸克根目录下
if not quark_path.startswith(self.quark_root_dir):
# 尝试强制映射,去掉前导路径
relative_path = quark_path.lstrip("/")
else:
print(f"📁 AList 刷新: 失败 ❌ {data.get('message')}")
# 去除夸克根目录前缀,并确保路径格式正确
relative_path = quark_path.replace(self.quark_root_dir, "", 1).lstrip("/")
# 构建AList路径
alist_path = os.path.normpath(
os.path.join(self.storage_mount_path, relative_path)
).replace("\\", "/")
return alist_path
def get_file_list(self, path, force_refresh=False):
"""
获取AList指定路径下的文件列表
Args:
path: AList文件路径
force_refresh: 是否强制刷新默认False
Returns:
dict: AList API返回的数据
"""
url = f"{self.url}/api/fs/list"
headers = {"Authorization": self.token}
headers = {
"Authorization": self.token,
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {
"path": path,
"refresh": force_refresh,
@ -140,15 +287,28 @@ class Alist:
"page": 1,
"per_page": 0,
}
try:
response = requests.request("POST", url, headers=headers, json=payload)
response = requests.request(
"POST",
url,
headers=headers,
json=payload,
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"📁 AList 刷新: 网络请求出错 ❌ {str(e)}")
except json.JSONDecodeError as e:
print(f"📁 AList 刷新: 解析数据出错 ❌ {str(e)}")
except Exception as e:
print(f"📁 AList 刷新: 获取文件列表出错 ❌ {e}")
return {}
print(f"📁 AList 刷新: 未知错误 ❌ {str(e)}")
return {"code": 500, "message": "获取文件列表出错"}
def get_root_folder_full_path(self, cookie, pdir_fid):
"""获取夸克根文件夹的完整路径"""
if pdir_fid == "0":
return "/"
url = "https://drive-h.quark.cn/1/clouddrive/file/sort"
@ -178,5 +338,5 @@ class Alist:
path = f"{path}/{item['file_name']}"
return path
except Exception as e:
print(f"AList 刷新: 获取 Quark 路径出错 {e}")
print(f"AList 刷新: 获取路径出错 {str(e)}")
return ""

View File

@ -3004,7 +3004,6 @@ def verify_account(account):
# 验证账号
print(f"▶️ 验证第 {account.index} 个账号")
if "__uid" not in account.cookie:
print(f"💡 不存在 cookie 必要参数,判断为仅签到")
return False
else:
account_info = account.init()
@ -3052,7 +3051,8 @@ def do_sign(account):
):
print(message)
else:
message = message.replace("今日", f"[{account.nickname}]今日")
if account.nickname:
message = message.replace("今日", f"{account.nickname} 今日")
add_notify(message)
else:
print(f"📅 签到异常: {sign_return}")
@ -4061,24 +4061,65 @@ def do_save(account, tasklist=[]):
if not display_files and file_nodes:
# 查找目录中修改时间最新的文件(可能是刚刚转存的)
today = datetime.now().strftime('%Y-%m-%d')
recent_files = []
recent_files = [] # 定义并初始化recent_files变量
# 首先尝试通过修改日期过滤当天的文件
for file in file_nodes:
# 如果有时间戳,转换为日期字符串
if 'updated_at' in file and file['updated_at']:
update_time = datetime.fromtimestamp(file['updated_at']).strftime('%Y-%m-%d')
if update_time == today:
recent_files.append(file)
try:
# 检查时间戳是否在合理范围内 (1970-2100年)
timestamp = file['updated_at']
if timestamp > 4102444800: # 2100年的时间戳
# 可能是毫秒级时间戳,尝试转换为秒级
timestamp = timestamp / 1000
# 再次检查时间戳是否在合理范围内
if 0 < timestamp < 4102444800:
update_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d')
if update_time == today:
recent_files.append(file)
else:
print(f"警告: 文件 {file.get('file_name', '未知')} 的时间戳 {file['updated_at']} 超出范围")
except (ValueError, OSError, OverflowError) as e:
print(f"警告: 处理文件 {file.get('file_name', '未知')} 的时间戳时出错: {e}")
# 如果没有找到当天的文件,至少显示一个最新的文件
if not recent_files and file_nodes:
# 按修改时间排序
recent_files = sorted(file_nodes, key=lambda x: x.get('updated_at', 0), reverse=True)
# 定义安全的排序键函数
def safe_timestamp_key(x):
try:
timestamp = x.get('updated_at', 0)
# 如果时间戳太大,可能是毫秒级时间戳
if timestamp > 4102444800: # 2100年的时间戳
timestamp = timestamp / 1000
# 再次检查范围
if timestamp < 0 or timestamp > 4102444800:
return 0 # 无效时间戳返回0
return timestamp
except (ValueError, TypeError):
return 0 # 无效返回0
try:
# 按修改时间排序,使用安全的排序函数
recent_files = sorted(file_nodes, key=safe_timestamp_key, reverse=True)
except Exception as e:
print(f"警告: 文件排序时出错: {e}")
# 如果排序出错,直接使用原始列表
recent_files = file_nodes
# 只取第一个作为显示
if recent_files:
display_files.append(recent_files[0]['file_name'])
try:
display_files.append(recent_files[0]['file_name'])
except (IndexError, KeyError) as e:
print(f"警告: 获取文件名时出错: {e}")
# 如果出错,尝试添加第一个文件(如果有)
if file_nodes:
try:
display_files.append(file_nodes[0]['file_name'])
except (KeyError, IndexError):
print("警告: 无法获取有效的文件名")
# 添加成功通知 - 修复问题:确保在有文件时添加通知
if display_files: