quark-auto-save/plugins/alist_sync.py
jenfonro 518037cee8
Some checks failed
Docker Publish / build-and-push (push) Has been cancelled
♻️ 插件 alist_sync 修改为不同步子目录 (#99)
* ♻️ 修改为不同步子目录:先暂时修改为不同步子目录,原因是目前不清楚通过调用API创建任务时,是否会将原有的文件进行覆盖,后续测试修改完毕后再将此项迁移至TV模式下启用

* ♻️ 增加获取文件列表失败提示:有2个原因会导致代码报错:1.api刷新的为最底层目录,如果保存的目录被删除且上层目录未刷新时,获取的是假的文件列表,可能会为空,则报错2.网络不好获取目录失败。增加提示告诉用户原因
2025-07-18 18:25:59 +08:00

314 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# plugins: 调用 alist 实现跨网盘转存
# author: https://github.com/jenfonro
import re
import json
import requests
class Alist_sync:
default_config = {
"url": "", # Alist服务器URL
"token": "", # Alist服务器Token
"quark_storage_id": "", # Alist 服务器夸克存储 ID
"save_storage_id": "", # Alist 服务器同步的存储 ID
"tv_mode": "", # TV库模式填入非0值开启
# TV库模式说明
# 1.开启后会验证文件名是否包含S01E01的正则格式目前仅包含mp4及mkv
# 2.会对比保存目录下是否存在该名称的mp4、mkv文件如果不存在才会进行同步
# 3.夸克目录及同步目录均会提取为S01E01的正则进行匹配不受其它字符影响
}
is_active = False
# 缓存参数
default_task_config = {
"enable": False, # 当前任务开关,
"save_path": "", # 需要同步目录,默认空时路径则会与夸克的保存路径一致,不开启完整路径模式时,默认根目录为保存驱动的根目录
"verify_path": "", # 验证目录主要用于影视库避免重复文件一般配合alist的别名功能及full_path_mode使用用于多个网盘的源合并成一个目录
"full_path_mode": False, # 完整路径模式
# 完整路径模式开启后不再限制保存目录的存储驱动将根据填入的路径进行保存需要填写完整的alist目录
}
def __init__(self, **kwargs):
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.url and self.token:
if self.verify_server():
self.is_active = True
def _send_request(self, method, url, **kwargs):
headers = {"Authorization": self.token, "Content-Type": "application/json"}
if "headers" in kwargs:
headers = kwargs["headers"]
del kwargs["headers"]
try:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
return response
except Exception as e:
print(f"_send_request error:\n{e}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "request error"}'
return fake_response
def verify_server(self):
url = f"{self.url}/api/me"
querystring = ""
headers = {"Authorization": self.token, "Content-Type": "application/json"}
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
response = response.json()
if response.get("code") == 200:
if response.get("data").get("username") == "guest":
print(f"Alist登陆失败请检查token")
else:
print(
f"Alist登陆成功当前用户: {response.get('data').get('username')}"
)
return True
else:
print(f"Alist同步: 连接服务器失败❌ {response.get('message')}")
except requests.exceptions.RequestException as e:
print(f"获取Alist信息出错: {e}")
return False
def run(self, task, **kwargs):
if not task["addition"]["alist_sync"]["enable"]:
return 0
print(f"开始进行alist同步")
# 这一块注释的是获取任务的参数在web界面可以看
# print("所有任务参数:")
# print(task)
# print(task['addition'])
# print(task['addition']['alist_sync'])
# print(task['addition']['alist_sync']['target_path'])
# 获取夸克挂载根目录
data = self.get_storage_path(self.quark_storage_id)
if data["driver"] != "Quark":
print(
f"Alist同步: 存储{self.quark_storage_id}非夸克存储❌ {data['driver']}"
)
return 0
quark_mount_root_path = re.sub(r".*root_folder_id\":\"", "", data["addition"])
quark_mount_root_path = re.sub(r"\",.*", "", quark_mount_root_path)
if quark_mount_root_path != "0" and quark_mount_root_path != "":
print(f"Alist同步: 存储{self.quark_storage_id}挂载的目录非夸克根目录❌")
return 0
self.quark_mount_path = data["mount_path"]
# 获取保存路径的挂载根目录
if self.save_storage_id != 0:
data = self.get_storage_path(self.save_storage_id)
self.save_mount_path = data["mount_path"]
# 保存的目录初始化
if task["addition"]["alist_sync"]["save_path"] == "":
self.save_path = f"{self.save_mount_path}/{task['savepath']}"
else:
self.save_path = task["addition"]["alist_sync"]["save_path"]
if not task["addition"]["alist_sync"]["full_path_mode"]:
if self.save_path.startswith("/"):
self.save_path = self.save_path[1:]
if self.save_path.endswith("/"):
self.save_path = self.save_path[:-1]
self.save_path = f"{self.save_mount_path}/{self.save_path}"
else:
# print('完整路径模式')
if not self.save_path.startswith("/"):
self.save_path = "/" + self.save_path
if self.save_path.endswith("/"):
self.save_path = self.save_path[:-1]
# 获取保存目录是否存在
if not self.get_path(self.save_path):
dir_exists = False
# 如果目录不存在判断两边路径是否一致,一致时直接创建复制目录任务即可
else:
dir_exists = True
copy_dir = False
# 初始化验证目录
# 如果没有填验证目录,则验证目录与保存目录一致
if task["addition"]["alist_sync"]["verify_path"]:
self.verify_path = task["addition"]["alist_sync"]["verify_path"]
if not task["addition"]["alist_sync"]["full_path_mode"]:
if self.verify_path.startswith("/"):
self.verify_path = self.save_path[1:]
if self.verify_path.endswith("/"):
self.verify_path = self.save_path[:-1]
self.verify_path = f"{self.save_mount_path}/{self.verify_path}"
else:
# print('完整路径模式')
if not self.verify_path.startswith("/"):
self.verify_path = "/" + self.save_path
if self.verify_path.endswith("/"):
self.verify_path = self.save_path[:-1]
else:
self.verify_path = self.save_path
# 初始化夸克目录
self.source_path = f"{self.quark_mount_path}/{task['savepath']}"
# 初始化任务名
self.taskname = f"{task['taskname']}"
# 获取网盘已有文件
source_dir_list = self.get_path_list(self.source_path)
if not source_dir_list:
print("获取夸克文件列表失败请检查网络或手动刷新alist中的夸克目录")
return 0
if self.tv_mode == 0 or self.tv_mode == "":
self.tv_mode = False
else:
self.tv_mode = True
# 如果是新建的目录则将所有文件直接复制
if not dir_exists:
self.get_save_file([], source_dir_list)
else:
verify_dir_list = self.get_path_list(self.verify_path)
if verify_dir_list:
self.get_save_file(verify_dir_list, source_dir_list)
else:
self.get_save_file([], source_dir_list)
if self.save_file_data:
self.save_start(self.save_file_data)
print("同步的文件列表:")
for save_file in self.save_file_data:
print(f"└── 🎞️{save_file}")
else:
print("没有需要同步的文件")
def save_start(self, save_file_data):
url = f"{self.url}/api/fs/copy"
payload = json.dumps(
{
"src_dir": self.source_path,
"dst_dir": self.save_path,
"names": save_file_data,
}
)
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print("未能进行Alist同步请手动同步")
else:
print("Alist创建任务成功")
self.copy_task = response.json()
def get_save_file(self, target_dir_list, source_dir_list):
self.save_file_data = []
if target_dir_list == []:
for source_list in source_dir_list:
if self.tv_mode:
if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)",
source_list["name"],
re.IGNORECASE,
):
self.save_file_data.append(source_list["name"])
else:
self.save_file_data.append(source_list["name"])
else:
for source_list in source_dir_list:
skip = False
source_list_filename = (
source_list["name"]
.replace(".mp4", "")
.replace(".mkv", "")
.replace(self.taskname + ".", "")
.lower()
)
for target_list in target_dir_list:
if source_list["is_dir"]:
# print(f"跳过目录同步")
skip = True
break
if self.tv_mode:
target_list_filename = (
target_list["name"]
.replace(".mp4", "")
.replace(".mkv", "")
.replace(self.taskname + ".", "")
.lower()
)
if source_list_filename == target_list_filename:
# print(f"文件存在,名称为:{target_list['name']}")
skip = True
break
else:
if source_list["name"] == target_list["name"]:
# print(f"文件存在,名称为:{target_dir['name']}")
skip = True
break
if self.tv_mode:
if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)",
source_list["name"],
re.IGNORECASE,
):
# 添加一句验证如果有MKVMP4存在时则只保存某一个格式
if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.mp4",
source_list["name"],
re.IGNORECASE,
):
for all_file in source_dir_list:
if (
source_list["name"].replace(".mp4", ".mkv")
== all_file["name"]
):
print(
f"{source_list['name']}拥有相同版本的MKV文件跳过复制"
)
skip = True
if not skip:
self.save_file_data.append(source_list["name"])
def get_path_list(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps(
{"path": path, "password": "", "page": 1, "per_page": 0, "refresh": True}
)
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print(f"获取Alist目录出错: {response}")
return False
else:
return response.json()["data"]["content"]
def get_path(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps({"path": path, "password": "", "force_root": False})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200 or response.json()["message"] != "success":
return False
else:
return True
def get_storage_path(self, storage_id):
url = f"{self.url}/api/admin/storage/get"
headers = {"Authorization": self.token}
querystring = {"id": storage_id}
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
data = response.json()
if data.get("code") == 200:
return data.get("data", [])
else:
print(f"Alist同步: 存储{storage_id}连接失败❌ {data.get('message')}")
except Exception as e:
print(f"Alist同步: 获取Alist存储出错 {e}")
return []