chore: 代码格式化

This commit is contained in:
Cp0204 2025-06-18 17:20:40 +08:00
parent dfa891d11d
commit 6630e3a159

View File

@ -1,10 +1,11 @@
# plugins: 调用 alist 实现跨网盘转存
# author: https://github.com/jenfonro
import os
import re import re
import json import json
from importlib.util import source_hash
import requests import requests
import sys
class Alist_sync: class Alist_sync:
default_config = { default_config = {
@ -12,8 +13,9 @@ class Alist_sync:
"token": "", # Alist服务器Token "token": "", # Alist服务器Token
"quark_storage_id": "", # Alist 服务器夸克存储 ID "quark_storage_id": "", # Alist 服务器夸克存储 ID
"save_storage_id": "", # Alist 服务器同步的存储 ID "save_storage_id": "", # Alist 服务器同步的存储 ID
"TV_mode": "" # TV库模式填入非0值开启 "tv_mode": "", # TV库模式填入非0值开启
#TV库模式说明1.开启后会验证文件名是否包含S01E01的正则格式目前仅包含mp4及mkv # TV库模式说明
# 1.开启后会验证文件名是否包含S01E01的正则格式目前仅包含mp4及mkv
# 2.会对比保存目录下是否存在该名称的mp4、mkv文件如果不存在才会进行同步 # 2.会对比保存目录下是否存在该名称的mp4、mkv文件如果不存在才会进行同步
# 3.夸克目录及同步目录均会提取为S01E01的正则进行匹配不受其它字符影响 # 3.夸克目录及同步目录均会提取为S01E01的正则进行匹配不受其它字符影响
} }
@ -24,9 +26,10 @@ class Alist_sync:
"enable": False, # 当前任务开关, "enable": False, # 当前任务开关,
"save_path": "", # 需要同步目录,默认空时路径则会与夸克的保存路径一致,不开启完整路径模式时,默认根目录为保存驱动的根目录 "save_path": "", # 需要同步目录,默认空时路径则会与夸克的保存路径一致,不开启完整路径模式时,默认根目录为保存驱动的根目录
"verify_path": "", # 验证目录主要用于影视库避免重复文件一般配合alist的别名功能及full_path使用用于多个网盘的源合并成一个目录 "verify_path": "", # 验证目录主要用于影视库避免重复文件一般配合alist的别名功能及full_path使用用于多个网盘的源合并成一个目录
"full_path": False #完整路径模式 "full_path": False, # 完整路径模式
# 完整路径模式开启后不再限制保存目录的存储驱动将根据填入的路径进行保存需要填写完整的alist目录 # 完整路径模式开启后不再限制保存目录的存储驱动将根据填入的路径进行保存需要填写完整的alist目录
} }
def __init__(self, **kwargs): def __init__(self, **kwargs):
if kwargs: if kwargs:
for key, _ in self.default_config.items(): for key, _ in self.default_config.items():
@ -38,12 +41,8 @@ class Alist_sync:
if self.verify_server(): if self.verify_server():
self.is_active = True self.is_active = True
def _send_request(self, method, url, **kwargs): def _send_request(self, method, url, **kwargs):
headers = { headers = {"Authorization": self.token, "Content-Type": "application/json"}
"Authorization": self.token,
'Content-Type': 'application/json'
}
if "headers" in kwargs: if "headers" in kwargs:
headers = kwargs["headers"] headers = kwargs["headers"]
del kwargs["headers"] del kwargs["headers"]
@ -59,23 +58,17 @@ class Alist_sync:
fake_response._content = b'{"status": 500, "message": "request error"}' fake_response._content = b'{"status": 500, "message": "request error"}'
return fake_response return fake_response
def verify_server(self): def verify_server(self):
url = f"{self.url}/api/me" url = f"{self.url}/api/me"
querystring = '' querystring = ""
headers = { headers = {"Authorization": self.token, "Content-Type": "application/json"}
"Authorization": self.token,
'Content-Type': 'application/json'
}
try: try:
response = requests.request("GET", url, headers=headers, params=querystring) response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status() response.raise_for_status()
response = response.json() response = response.json()
if response.get("code") == 200: if response.get("code") == 200:
if response.get("data").get("username") == "guest": if response.get("data").get("username") == "guest":
print( print(f"Alist登陆失败请检查token")
f"Alist登陆失败请检查token"
)
else: else:
print( print(
f"Alist登陆成功当前用户: {response.get('data').get('username')}" f"Alist登陆成功当前用户: {response.get('data').get('username')}"
@ -86,8 +79,9 @@ class Alist_sync:
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print(f"获取Alist信息出错: {e}") print(f"获取Alist信息出错: {e}")
return False return False
def run(self, task, **kwargs): def run(self, task, **kwargs):
if not task['addition']['alist_sync']['enable']: if not task["addition"]["alist_sync"]["enable"]:
return 0 return 0
print(f"开始进行alist同步") print(f"开始进行alist同步")
# 这一块注释的是获取任务的参数在web界面可以看 # 这一块注释的是获取任务的参数在web界面可以看
@ -100,10 +94,12 @@ class Alist_sync:
# 获取夸克挂载根目录 # 获取夸克挂载根目录
data = self.get_storage_path(self.quark_storage_id) data = self.get_storage_path(self.quark_storage_id)
if data["driver"] != "Quark": if data["driver"] != "Quark":
print(f"Alist同步: 存储{self.quark_storage_id}非夸克存储❌ {data["driver"]}") print(
f"Alist同步: 存储{self.quark_storage_id}非夸克存储❌ {data["driver"]}"
)
return 0 return 0
quark_mount_root_path = re.sub(r".*root_folder_id\":\"",'',data["addition"]) quark_mount_root_path = re.sub(r".*root_folder_id\":\"", "", data["addition"])
quark_mount_root_path = re.sub(r"\",.*", '', quark_mount_root_path) quark_mount_root_path = re.sub(r"\",.*", "", quark_mount_root_path)
if quark_mount_root_path != "0" and quark_mount_root_path != "": if quark_mount_root_path != "0" and quark_mount_root_path != "":
print(f"Alist同步: 存储{self.quark_storage_id}挂载的目录非夸克根目录❌") print(f"Alist同步: 存储{self.quark_storage_id}挂载的目录非夸克根目录❌")
return 0 return 0
@ -115,11 +111,11 @@ class Alist_sync:
self.save_mount_path = data["mount_path"] self.save_mount_path = data["mount_path"]
# 保存的目录初始化 # 保存的目录初始化
if task['addition']['alist_sync']['save_path'] == "": if task["addition"]["alist_sync"]["save_path"] == "":
self.save_path = f"{self.save_mount_path}/{task['savepath']}" self.save_path = f"{self.save_mount_path}/{task['savepath']}"
else: else:
self.save_path = task['addition']['alist_sync']['save_path'] self.save_path = task["addition"]["alist_sync"]["save_path"]
if not task['addition']['alist_sync']['full_path'] : if not task["addition"]["alist_sync"]["full_path"]:
if self.save_path.startswith("/"): if self.save_path.startswith("/"):
self.save_path = self.save_path[1:] self.save_path = self.save_path[1:]
if self.save_path.endswith("/"): if self.save_path.endswith("/"):
@ -144,9 +140,9 @@ class Alist_sync:
# 初始化验证目录 # 初始化验证目录
# 如果没有填验证目录,则验证目录与保存目录一致 # 如果没有填验证目录,则验证目录与保存目录一致
if task['addition']['alist_sync']['verify_path']: if task["addition"]["alist_sync"]["verify_path"]:
self.verify_path = task['addition']['alist_sync']['verify_path'] self.verify_path = task["addition"]["alist_sync"]["verify_path"]
if not task['addition']['alist_sync']['full_path'] : if not task["addition"]["alist_sync"]["full_path"]:
if self.verify_path.startswith("/"): if self.verify_path.startswith("/"):
self.verify_path = self.save_path[1:] self.verify_path = self.save_path[1:]
if self.verify_path.endswith("/"): if self.verify_path.endswith("/"):
@ -161,7 +157,6 @@ class Alist_sync:
else: else:
self.verify_path = self.save_path self.verify_path = self.save_path
# 初始化夸克目录 # 初始化夸克目录
self.source_path = f"{self.quark_mount_path}/{task['savepath']}" self.source_path = f"{self.quark_mount_path}/{task['savepath']}"
# 初始化任务名 # 初始化任务名
@ -185,7 +180,6 @@ class Alist_sync:
else: else:
self.get_save_file([], source_dir_list) self.get_save_file([], source_dir_list)
if self.save_file_data: if self.save_file_data:
self.save_start(self.save_file_data) self.save_start(self.save_file_data)
print("同步的文件列表:") print("同步的文件列表:")
@ -193,18 +187,21 @@ class Alist_sync:
print(f"└── 🎞️{save_file}") print(f"└── 🎞️{save_file}")
else: else:
print("没有需要同步的文件") print("没有需要同步的文件")
def save_start(self, save_file_data): def save_start(self, save_file_data):
url = f"{self.url}/api/fs/copy" url = f"{self.url}/api/fs/copy"
payload = json.dumps({ payload = json.dumps(
{
"src_dir": self.source_path, "src_dir": self.source_path,
"dst_dir": self.save_path, "dst_dir": self.save_path,
"names": save_file_data "names": save_file_data,
}) }
)
response = self._send_request("POST", url, data=payload) response = self._send_request("POST", url, data=payload)
if response.status_code != 200: if response.status_code != 200:
print('未能进行Alist同步请手动同步') print("未能进行Alist同步请手动同步")
else: else:
print('Alist创建任务成功') print("Alist创建任务成功")
self.copy_task = response.json() self.copy_task = response.json()
def get_save_file(self, target_dir_list, source_dir_list): def get_save_file(self, target_dir_list, source_dir_list):
@ -212,49 +209,71 @@ class Alist_sync:
if target_dir_list == []: if target_dir_list == []:
for source_list in source_dir_list: for source_list in source_dir_list:
if self.tv_mode: if self.tv_mode:
if re.search(self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)", source_list['name'], re.IGNORECASE): if re.search(
self.save_file_data.append(source_list['name']) self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)",
source_list["name"],
re.IGNORECASE,
):
self.save_file_data.append(source_list["name"])
else: else:
self.save_file_data.append(source_list['name']) self.save_file_data.append(source_list["name"])
else: else:
for source_list in source_dir_list: for source_list in source_dir_list:
skip = False skip = False
source_list_filename = source_list['name'].replace('.mp4', '').replace('.mkv', '').replace(self.taskname + '.' , '').lower() source_list_filename = (
source_list["name"]
.replace(".mp4", "")
.replace(".mkv", "")
.replace(self.taskname + ".", "")
.lower()
)
for target_list in target_dir_list: for target_list in target_dir_list:
if self.tv_mode: if self.tv_mode:
target_list_filename = target_list['name'].replace('.mp4', '').replace('.mkv', '').replace(self.taskname + '.' , '').lower() target_list_filename = (
target_list["name"]
.replace(".mp4", "")
.replace(".mkv", "")
.replace(self.taskname + ".", "")
.lower()
)
if source_list_filename == target_list_filename: if source_list_filename == target_list_filename:
# print(f"文件存在,名称为:{target_list['name']}") # print(f"文件存在,名称为:{target_list['name']}")
skip = True skip = True
break break
else: else:
if source_list['name'] == target_list['name']: if source_list["name"] == target_list["name"]:
# print(f"文件存在,名称为:{target_dir['name']}") # print(f"文件存在,名称为:{target_dir['name']}")
skip = True skip = True
break break
if self.tv_mode: if self.tv_mode:
if re.search(self.taskname+r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)", source_list['name'],re.IGNORECASE): if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)",
source_list["name"],
re.IGNORECASE,
):
# 添加一句验证如果有MKVMP4存在时则只保存某一个格式 # 添加一句验证如果有MKVMP4存在时则只保存某一个格式
if re.search(self.taskname + r"\.s\d{1,3}e\d{1,3}\.mp4", source_list['name'],re.IGNORECASE): if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.mp4",
source_list["name"],
re.IGNORECASE,
):
for all_file in source_dir_list: for all_file in source_dir_list:
if source_list['name'].replace('.mp4', '.mkv') == all_file['name']: if (
print(f"{source_list['name']}拥有相同版本的MKV文件跳过复制") source_list["name"].replace(".mp4", ".mkv")
== all_file["name"]
):
print(
f"{source_list['name']}拥有相同版本的MKV文件跳过复制"
)
skip = True skip = True
if not skip: if not skip:
self.save_file_data.append(source_list['name']) self.save_file_data.append(source_list["name"])
def get_path_list(self, path): def get_path_list(self, path):
url = f"{self.url}/api/fs/list" url = f"{self.url}/api/fs/list"
payload = json.dumps({ payload = json.dumps(
"path": path, {"path": path, "password": "", "page": 1, "per_page": 0, "refresh": True}
"password": "", )
"page": 1,
"per_page": 0,
"refresh": True
})
response = self._send_request("POST", url, data=payload) response = self._send_request("POST", url, data=payload)
if response.status_code != 200: if response.status_code != 200:
print(f"获取Alist目录出错: {response}") print(f"获取Alist目录出错: {response}")
@ -264,13 +283,9 @@ class Alist_sync:
def get_path(self, path): def get_path(self, path):
url = f"{self.url}/api/fs/list" url = f"{self.url}/api/fs/list"
payload = json.dumps({ payload = json.dumps({"path": path, "password": "", "force_root": False})
"path": path,
"password": "",
"force_root": False
})
response = self._send_request("POST", url, data=payload) response = self._send_request("POST", url, data=payload)
if response.status_code != 200 or response.json()['message'] != "success" : if response.status_code != 200 or response.json()["message"] != "success":
return False return False
else: else:
return True return True
@ -290,4 +305,3 @@ class Alist_sync:
except Exception as e: except Exception as e:
print(f"Alist同步: 获取Alist存储出错 {e}") print(f"Alist同步: 获取Alist存储出错 {e}")
return [] return []