Compare commits

..

4 Commits

Author SHA1 Message Date
jenfonro
e90f191a61
新增alist同步插件 2025-06-15 20:03:55 +08:00
jenfonro
5e1bb7b807
Delete plugins/alist_sync.py
移出重新上传,刷新信息
2025-06-15 20:03:36 +08:00
jenfonro
dc159210b2
Delete plugins/alist_save_other_path.py
移出旧文件
2025-06-15 20:02:58 +08:00
jenfonro
bfa0e2ae6e
新增alist_sync.py 2025-06-15 20:01:43 +08:00
2 changed files with 288 additions and 228 deletions

View File

@ -1,228 +0,0 @@
### 当verify_path变量不存在时将在target_path路径下进行比对文件列表
import os
import re
import json
from importlib.util import source_hash
import requests
import sys
class Alist_save_other_path:
default_config = {
"url": "", # Alist服务器URL
"token": "", # Alist服务器Token
"quark_path": "" # Alist 服务器夸克路径
}
is_active = False
# 缓存参数
default_task_config = {
"target_path": "", # 需要转存到的目录
"verify_path": ""
}
def __init__(self, **kwargs):
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.url and self.token:
if self.verify_server():
self.is_active = True
def _send_request(self, method, url, **kwargs):
headers = {
"Authorization": self.token,
'Content-Type': 'application/json'
}
if "headers" in kwargs:
headers = kwargs["headers"]
del kwargs["headers"]
try:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
return response
except Exception as e:
print(f"_send_request error:\n{e}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "request error"}'
return fake_response
def verify_server(self):
url = f"{self.url}/api/me"
querystring = ''
headers = {
"Authorization": self.token,
'Content-Type': 'application/json'
}
#response = self._send_request("GET", url, params=querystring).json()
#return response["data"]["username"]
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
response = response.json()
if response.get("code") == 200:
if response.get("data").get("username") == "guest":
print(
f"Alist登陆失败请检查token"
)
else:
print(
f"Alist登陆成功当前用户: {response.get('data').get('username')}"
)
return True
else:
print(f"Alist刷新: 连接失败❌ {response.get('message')}")
except requests.exceptions.RequestException as e:
print(f"获取Alist信息出错: {e}")
return False
def run(self, task, **kwargs):
print(f"开始进行alist转存")
#这一块注释的是获取任务的参数在web界面可以看
#print(task)
# print(task['addition'])
# print(task['addition']['alist_save_other_path'])
# print(task['addition']['alist_save_other_path']['target_path'])
#如果没有填验证目录,则验证目录与保存目录一致
if task['addition']['alist_save_other_path']['verify_path']:
self.verify_path = task['addition']['alist_save_other_path']['verify_path']
else:
self.verify_path = task['addition']['alist_save_other_path']['target_path']
#这里获取的设置中的保存目录、验证目录
self.target_path = task['addition']['alist_save_other_path']['target_path']
#先验证目录是否存在,如果不存在则直接复制整个目录过去
Error_quit = False
if not self.get_path(self.target_path):
dir_exists = False
print('新建保存目标目录')
if not self.mkdir(self.target_path):
print('新建保存目标目录失败')
Error_quit = True
else:
dir_exists = True
if not Error_quit:
if dir_exists:
verify_dir_list = self.get_path_list(self.verify_path)
#初始化alist中的夸克目录
self.source_path = f"{self.quark_path}{task['savepath']}"
#初始化任务名
self.taskname = f"{task['taskname']}"
source_dir_list = self.get_path_list(self.source_path)
print("网盘中的文件列表")
print(f"{source_dir_list}")
#比对需要复制的文件
if dir_exists:
self.get_save_file(verify_dir_list, source_dir_list)
else:
self.save_file_data = []
for source_list in source_dir_list:
self.save_file_data.append(source_list['name'])
if self.save_file_data :
self.save_start(self.save_file_data)
print("转存的文件列表:")
for save_file in self.save_file_data:
print(f"└── 🎞️{save_file}")
else:
print("转存文件均已存在")
def save_start(self, save_file_data):
url = f"{self.url}/api/fs/copy"
payload = json.dumps({
"src_dir": self.source_path,
"dst_dir": self.target_path,
"names": save_file_data
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print('未能进行Alist转存请手动转存')
else:
print('Alist创建任务成功')
self.copy_task = response.json()
def get_save_file(self, target_dir_list, source_dir_list):
self.save_file_data = []
if target_dir_list:
for source_list in source_dir_list:
file_exists = False
for target_list in target_dir_list:
if source_list['name'].replace('.mp4', '').replace('.mkv', '') == target_list['name'].replace('.mp4', '').replace('.mkv', ''):
#print(f"文件存在,名称为:{target_dir['name']}")
file_exists = True
break
if not file_exists:
skip=False
if re.search(self.taskname+r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)", source_list['name'],re.IGNORECASE):
#添加一句验证如果有MKVMP4存在时则只保存某一个格式
if re.search(self.taskname + r"\.s\d{1,3}e\d{1,3}\.mp4", source_list['name'],
re.IGNORECASE):
for all_file in source_dir_list:
if source_list['name'].replace('.mp4', '.mkv') == all_file['name']:
print(f"{source_list['name']}拥有相同版本的MKV文件跳过复制")
skip=True
if not skip:
self.save_file_data.append(source_list['name'])
else:
for source_list in source_dir_list:
if re.search(self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)", source_list['name'], re.IGNORECASE):
self.save_file_data.append(source_list['name'])
def mkdir(self, path):
url = f"{self.url}/api/fs/mkdir"
payload = json.dumps({
"path": path,
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200 or response.json()['message'] != "success" :
return False
else:
return True
def get_path(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps({
"path": path,
"password": "",
"force_root": False
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200 or response.json()['message'] != "success" :
return False
else:
return True
def get_path_list(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps({
"path": path,
"password": "",
"page": 1,
"per_page": 0,
"refresh": True
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print(f"获取Alist目录出错: {response}")
return False
else:
return response.json()["data"]["content"]

288
plugins/alist_sync.py Normal file
View File

@ -0,0 +1,288 @@
import os
import re
import json
from importlib.util import source_hash
import requests
import sys
class Alist_sync:
default_config = {
"url": "", # Alist服务器URL
"token": "", # Alist服务器Token
"quark_storage_id": "", # Alist 服务器夸克存储 ID
"save_storage_id": "", # Alist 服务器同步的存储 ID
"TV_mode": "" # TV库模式填入非0值开启
#TV库模式说明1.开启后会验证文件名是否包含S01E01的正则格式目前仅包含mp4及mkv
#2.会对比保存目录下是否存在该名称的mp4、mkv文件如果不存在才会进行同步
#3.夸克目录及同步目录均会提取为S01E01的正则进行匹配不受其它字符影响
}
is_active = False
# 缓存参数
default_task_config = {
"enable": False, # 当前任务开关,
"save_path": "", # 需要同步目录,默认空时路径则会与夸克的保存路径一致,不开启完整路径模式时,默认根目录为保存驱动的根目录
"verify_path": "", # 验证目录主要用于影视库避免重复文件一般配合alist的别名功能及full_path使用用于多个网盘的源合并成一个目录
"full_path": False #完整路径模式
#完整路径模式开启后不再限制保存目录的存储驱动将根据填入的路径进行保存需要填写完整的alist目录
}
def __init__(self, **kwargs):
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.url and self.token:
if self.verify_server():
self.is_active = True
def _send_request(self, method, url, **kwargs):
headers = {
"Authorization": self.token,
'Content-Type': 'application/json'
}
if "headers" in kwargs:
headers = kwargs["headers"]
del kwargs["headers"]
try:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
return response
except Exception as e:
print(f"_send_request error:\n{e}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "request error"}'
return fake_response
def verify_server(self):
url = f"{self.url}/api/me"
querystring = ''
headers = {
"Authorization": self.token,
'Content-Type': 'application/json'
}
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
response = response.json()
if response.get("code") == 200:
if response.get("data").get("username") == "guest":
print(
f"Alist登陆失败请检查token"
)
else:
print(
f"Alist登陆成功当前用户: {response.get('data').get('username')}"
)
return True
else:
print(f"Alist同步: 连接服务器失败❌ {response.get('message')}")
except requests.exceptions.RequestException as e:
print(f"获取Alist信息出错: {e}")
return False
def run(self, task, **kwargs):
if not task['addition']['alist_sync']['enable']:
return 0
print(f"开始进行alist同步")
#这一块注释的是获取任务的参数在web界面可以看
#print("所有任务参数:")
#print(task)
#print(task['addition'])
# print(task['addition']['alist_sync'])
# print(task['addition']['alist_sync']['target_path'])
#获取夸克挂载根目录
data = self.get_storage_path(self.quark_storage_id)
if data["driver"] != "Quark":
print(f"Alist同步: 存储{slef.quark_storage_id}非夸克存储❌ {data["driver"]}")
else:
self.quark_mount_path = data["mount_path"]
#获取保存路径的挂载根目录
if self.save_storage_id != 0:
data = self.get_storage_path(self.save_storage_id)
self.save_mount_path = data["mount_path"]
#保存的目录初始化
if task['addition']['alist_sync']['save_path'] == "":
self.save_path = f"{self.save_mount_path}/{task['savepath']}"
else:
self.save_path = task['addition']['alist_sync']['save_path']
if not task['addition']['alist_sync']['full_path'] :
if self.save_path.startswith("/"):
self.save_path = self.save_path[1:]
if self.save_path.endswith("/"):
self.save_path = self.save_path[:-1]
self.save_path = f"{self.save_mount_path}/{self.save_path}"
else:
#print('完整路径模式')
if not self.save_path.startswith("/"):
self.save_path = "/" + self.save_path
if self.save_path.endswith("/"):
self.save_path = self.save_path[:-1]
#获取保存目录是否存在
if not self.get_path(self.save_path):
dir_exists = False
#如果目录不存在判断两边路径是否一致,一致时直接创建复制目录任务即可
else:
dir_exists = True
copy_dir = False
#初始化验证目录
#如果没有填验证目录,则验证目录与保存目录一致
if task['addition']['alist_sync']['verify_path']:
self.verify_path = task['addition']['alist_sync']['verify_path']
if not task['addition']['alist_sync']['full_path'] :
if self.verify_path.startswith("/"):
self.verify_path = self.save_path[1:]
if self.verify_path.endswith("/"):
self.verify_path = self.save_path[:-1]
self.verify_path = f"{self.save_mount_path}/{self.verify_path}"
else:
#print('完整路径模式')
if not self.verify_path.startswith("/"):
self.verify_path = "/" + self.save_path
if self.verify_path.endswith("/"):
self.verify_path = self.save_path[:-1]
else:
self.verify_path = self.save_path
#初始化夸克目录
self.source_path = f"{self.quark_mount_path}/{task['savepath']}"
#初始化任务名
self.taskname = f"{task['taskname']}"
#获取网盘已有文件
source_dir_list = self.get_path_list(self.source_path)
if self.TV_mode == 0 or self.TV_mode == "" :
self.tv_mode = False
else:
self.tv_mode = True
#如果是新建的目录则将所有文件直接复制
if not dir_exists:
self.get_save_file([], source_dir_list)
else:
verify_dir_list = self.get_path_list(self.verify_path)
if verify_dir_list :
self.get_save_file(verify_dir_list, source_dir_list)
else:
self.get_save_file([], source_dir_list)
if self.save_file_data :
self.save_start(self.save_file_data)
print("同步的文件列表:")
for save_file in self.save_file_data:
print(f"└── 🎞️{save_file}")
else:
print("没有需要同步的文件")
def save_start(self, save_file_data):
url = f"{self.url}/api/fs/copy"
payload = json.dumps({
"src_dir": self.source_path,
"dst_dir": self.save_path,
"names": save_file_data
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print('未能进行Alist同步请手动同步')
else:
print('Alist创建任务成功')
self.copy_task = response.json()
def get_save_file(self, target_dir_list, source_dir_list):
self.save_file_data = []
if target_dir_list == []:
for source_list in source_dir_list:
if self.tv_mode :
if re.search(self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)", source_list['name'], re.IGNORECASE):
self.save_file_data.append(source_list['name'])
else:
self.save_file_data.append(source_list['name'])
else:
for source_list in source_dir_list:
skip = False
source_list_filename = source_list['name'].replace('.mp4', '').replace('.mkv', '').replace(self.taskname + '.' , '').lower()
for target_list in target_dir_list:
if self.tv_mode :
target_list_filename = target_list['name'].replace('.mp4', '').replace('.mkv', '').replace(self.taskname + '.' , '').lower()
if source_list_filename == target_list_filename :
#print(f"文件存在,名称为:{target_list['name']}")
skip = True
break
else:
if source_list['name'] == target_list['name']:
#print(f"文件存在,名称为:{target_dir['name']}")
skip = True
break
if self.tv_mode :
if re.search(self.taskname+r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)", source_list['name'],re.IGNORECASE):
#添加一句验证如果有MKVMP4存在时则只保存某一个格式
if re.search(self.taskname + r"\.s\d{1,3}e\d{1,3}\.mp4", source_list['name'],re.IGNORECASE):
for all_file in source_dir_list:
if source_list['name'].replace('.mp4', '.mkv') == all_file['name']:
print(f"{source_list['name']}拥有相同版本的MKV文件跳过复制")
skip=True
if not skip:
self.save_file_data.append(source_list['name'])
def get_path_list(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps({
"path": path,
"password": "",
"page": 1,
"per_page": 0,
"refresh": True
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print(f"获取Alist目录出错: {response}")
return False
else:
return response.json()["data"]["content"]
def get_path(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps({
"path": path,
"password": "",
"force_root": False
})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200 or response.json()['message'] != "success" :
return False
else:
return True
def get_storage_path(self, storage_id):
url = f"{self.url}/api/admin/storage/get"
headers = {"Authorization": self.token}
querystring = {"id": storage_id}
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
data = response.json()
if data.get("code") == 200:
return data.get("data", [])
else:
print(f"Alist同步: 存储{storage_id}连接失败❌ {data.get('message')}")
except Exception as e:
print(f"Alist同步: 获取Alist存储出错 {e}")
return []