在 Cp0204/quark-auto-save v0.5.3.1 的基础上增加过滤规则功能

This commit is contained in:
x1ao4 2025-04-21 02:18:39 +08:00
parent 6f969f9416
commit 997bba7047
3 changed files with 828 additions and 774 deletions

View File

@ -1,6 +1,7 @@
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
from flask import (
json,
Flask,
url_for,
session,
@ -14,15 +15,16 @@ from flask import (
)
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from sdk.cloudsaver import CloudSaver
from datetime import timedelta
import subprocess
import requests
import hashlib
import logging
import base64
import json
import sys
import os
import re
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, parent_dir)
@ -46,9 +48,10 @@ PYTHON_PATH = "python3" if os.path.exists("/usr/bin/python3") else "python"
SCRIPT_PATH = os.environ.get("SCRIPT_PATH", "./quark_auto_save.py")
CONFIG_PATH = os.environ.get("CONFIG_PATH", "./config/quark_config.json")
PLUGIN_FLAGS = os.environ.get("PLUGIN_FLAGS", "")
DEBUG = os.environ.get("DEBUG", False)
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
task_plugins_config = {}
config_data = {}
task_plugins_config_default = {}
app = Flask(__name__)
app.config["APP_VERSION"] = get_app_ver()
@ -77,24 +80,15 @@ def gen_md5(string):
return md5.hexdigest()
# 读取 JSON 文件内容
def read_json():
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
return data
# 将数据写入 JSON 文件
def write_json(data):
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, sort_keys=False, indent=2)
def get_login_token():
username = config_data["webui"]["username"]
password = config_data["webui"]["password"]
return gen_md5(f"token{username}{password}+-*/")[8:24]
def is_login():
data = read_json()
username = data["webui"]["username"]
password = data["webui"]["password"]
if session.get("login") == gen_md5(username + password):
login_token = get_login_token()
if session.get("token") == login_token or request.args.get("token") == login_token:
return True
else:
return False
@ -114,16 +108,15 @@ def favicon():
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
data = read_json()
username = data["webui"]["username"]
password = data["webui"]["password"]
username = config_data["webui"]["username"]
password = config_data["webui"]["password"]
# 验证用户名和密码
if (username == request.form.get("username")) and (
password == request.form.get("password")
):
logging.info(f">>> 用户 {username} 登录成功")
session["login"] = gen_md5(username + password)
session.permanent = True
session["token"] = get_login_token()
return redirect(url_for("index"))
else:
logging.info(f">>> 用户 {username} 登录失败")
@ -137,7 +130,7 @@ def login():
# 退出登录
@app.route("/logout")
def logout():
session.pop("login", None)
session.pop("token", None)
return redirect(url_for("login"))
@ -155,47 +148,51 @@ def index():
@app.route("/data")
def get_data():
if not is_login():
return redirect(url_for("login"))
data = read_json()
return jsonify({"success": False, "message": "未登录"})
data = Config.read_json(CONFIG_PATH)
del data["webui"]
data["task_plugins_config"] = task_plugins_config
return jsonify(data)
data["api_token"] = get_login_token()
data["task_plugins_config_default"] = task_plugins_config_default
return jsonify({"success": True, "data": data})
# 更新数据
@app.route("/update", methods=["POST"])
def update():
global config_data
if not is_login():
return "未登录"
data = request.json
data["webui"] = read_json()["webui"]
if "task_plugins_config" in data:
del data["task_plugins_config"]
write_json(data)
return jsonify({"success": False, "message": "未登录"})
dont_save_keys = ["task_plugins_config_default", "api_token"]
for key, value in request.json.items():
if key not in dont_save_keys:
config_data.update({key: value})
Config.write_json(CONFIG_PATH, config_data)
# 重新加载任务
if reload_tasks():
logging.info(f">>> 配置更新成功")
return "配置更新成功"
return jsonify({"success": True, "message": "配置更新成功"})
else:
logging.info(f">>> 配置更新失败")
return "配置更新失败"
return jsonify({"success": False, "message": "配置更新失败"})
# 处理运行脚本请求
@app.route("/run_script_now", methods=["GET"])
@app.route("/run_script_now", methods=["POST"])
def run_script_now():
if not is_login():
return "未登录"
task_index = request.args.get("task_index", "")
command = [PYTHON_PATH, "-u", SCRIPT_PATH, CONFIG_PATH, task_index]
return jsonify({"success": False, "message": "未登录"})
tasklist = request.json.get("tasklist", [])
command = [PYTHON_PATH, "-u", SCRIPT_PATH, CONFIG_PATH]
logging.info(
f">>> 手动运行任务{int(task_index)+1 if task_index.isdigit() else 'all'}"
f">>> 手动运行任务 [{tasklist[0].get('taskname') if len(tasklist)>0 else 'ALL'}] 开始执行..."
)
def generate_output():
# 设置环境变量
process_env = os.environ.copy()
process_env["PYTHONIOENCODING"] = "utf-8"
if tasklist:
process_env["TASKLIST"] = json.dumps(tasklist, ensure_ascii=False)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
@ -224,64 +221,170 @@ def run_script_now():
@app.route("/task_suggestions")
def get_task_suggestions():
if not is_login():
return jsonify({"error": "未登录"})
base_url = base64.b64decode("aHR0cHM6Ly9zLjkxNzc4OC54eXo=").decode()
return jsonify({"success": False, "message": "未登录"})
query = request.args.get("q", "").lower()
deep = request.args.get("d", "").lower()
url = f"{base_url}/task_suggestions?q={query}&d={deep}"
try:
response = requests.get(url)
return jsonify(response.json())
cs_data = config_data.get("source", {}).get("cloudsaver", {})
if (
cs_data.get("server")
and cs_data.get("username")
and cs_data.get("password")
):
cs = CloudSaver(cs_data.get("server"))
cs.set_auth(
cs_data.get("username", ""),
cs_data.get("password", ""),
cs_data.get("token", ""),
)
search = cs.auto_login_search(query)
if search.get("success"):
if search.get("new_token"):
cs_data["token"] = search.get("new_token")
Config.write_json(CONFIG_PATH, config_data)
search_results = cs.clean_search_results(search.get("data"))
return jsonify(
{"success": True, "source": "CloudSaver", "data": search_results}
)
else:
return jsonify({"success": True, "message": search.get("message")})
else:
base_url = base64.b64decode("aHR0cHM6Ly9zLjkxNzc4OC54eXo=").decode()
url = f"{base_url}/task_suggestions?q={query}&d={deep}"
response = requests.get(url)
return jsonify(
{"success": True, "source": "网络公开", "data": response.json()}
)
except Exception as e:
return jsonify({"error": str(e)})
return jsonify({"success": True, "message": f"error: {str(e)}"})
@app.route("/get_share_detail")
def get_share_files():
@app.route("/get_share_detail", methods=["POST"])
def get_share_detail():
if not is_login():
return jsonify({"error": "未登录"})
shareurl = request.args.get("shareurl", "")
return jsonify({"success": False, "message": "未登录"})
shareurl = request.json.get("shareurl", "")
stoken = request.json.get("stoken", "")
account = Quark("", 0)
pwd_id, passcode, pdir_fid = account.get_id_from_url(shareurl)
is_sharing, stoken = account.get_stoken(pwd_id, passcode)
if not is_sharing:
return jsonify({"error": stoken})
share_detail = account.get_detail(pwd_id, stoken, pdir_fid, 1)
return jsonify(share_detail)
pwd_id, passcode, pdir_fid, paths = account.extract_url(shareurl)
if not stoken:
is_sharing, stoken = account.get_stoken(pwd_id, passcode)
if not is_sharing:
return jsonify({"success": False, "data": {"error": stoken}})
share_detail = account.get_detail(pwd_id, stoken, pdir_fid, _fetch_share=1)
share_detail["paths"] = paths
share_detail["stoken"] = stoken
# 正则命名预览
def preview_regex(share_detail):
regex = request.json.get("regex")
pattern, replace = account.magic_regex_func(
regex.get("pattern", ""),
regex.get("replace", ""),
regex.get("taskname", ""),
regex.get("magic_regex", {}),
)
# 应用过滤词过滤
filterwords = request.json.get("regex", {}).get("filterwords", "")
if filterwords:
# 同时支持中英文逗号分隔
filterwords = filterwords.replace("", ",")
filterwords_list = [word.strip() for word in filterwords.split(',')]
for item in share_detail["list"]:
# 被过滤的文件不会有file_name_re与不匹配正则的文件显示一致
if any(word in item['file_name'] for word in filterwords_list):
item["filtered"] = True
# 应用正则命名
for item in share_detail["list"]:
# 只对未被过滤的文件应用正则命名
if not item.get("filtered") and re.search(pattern, item["file_name"]):
file_name = item["file_name"]
item["file_name_re"] = (
re.sub(pattern, replace, file_name) if replace != "" else file_name
)
return share_detail
share_detail = preview_regex(share_detail)
return jsonify({"success": True, "data": share_detail})
@app.route("/get_savepath")
def get_savepath():
@app.route("/get_savepath_detail")
def get_savepath_detail():
if not is_login():
return jsonify({"error": "未登录"})
data = read_json()
account = Quark(data["cookie"][0], 0)
return jsonify({"success": False, "message": "未登录"})
account = Quark(config_data["cookie"][0], 0)
paths = []
if path := request.args.get("path"):
if path == "/":
fid = 0
elif get_fids := account.get_fids([path]):
fid = get_fids[0]["fid"]
else:
return jsonify([])
dir_names = path.split("/")
if dir_names[0] == "":
dir_names.pop(0)
path_fids = []
current_path = ""
for dir_name in dir_names:
current_path += "/" + dir_name
path_fids.append(current_path)
if get_fids := account.get_fids(path_fids):
fid = get_fids[-1]["fid"]
paths = [
{"fid": get_fid["fid"], "name": dir_name}
for get_fid, dir_name in zip(get_fids, dir_names)
]
else:
return jsonify({"success": False, "data": {"error": "获取fid失败"}})
else:
fid = request.args.get("fid", 0)
file_list = account.ls_dir(fid)
return jsonify(file_list)
fid = request.args.get("fid", "0")
file_list = {
"list": account.ls_dir(fid),
"paths": paths,
}
return jsonify({"success": True, "data": file_list})
@app.route("/delete_file", methods=["POST"])
def delete_file():
if not is_login():
return jsonify({"error": "未登录"})
data = read_json()
account = Quark(data["cookie"][0], 0)
return jsonify({"success": False, "message": "未登录"})
account = Quark(config_data["cookie"][0], 0)
if fid := request.json.get("fid"):
response = account.delete([fid])
else:
response = {"error": "fid not found"}
response = {"success": False, "message": "缺失必要字段: fid"}
return jsonify(response)
# 添加任务接口
@app.route("/api/add_task", methods=["POST"])
def add_task():
global config_data
# 验证token
if not is_login():
return jsonify({"success": False, "code": 1, "message": "未登录"}), 401
# 必选字段
request_data = request.json
required_fields = ["taskname", "shareurl", "savepath"]
for field in required_fields:
if field not in request_data or not request_data[field]:
return (
jsonify(
{"success": False, "code": 2, "message": f"缺少必要字段: {field}"}
),
400,
)
# 添加任务
config_data["tasklist"].append(request_data)
Config.write_json(CONFIG_PATH, config_data)
logging.info(f">>> 通过API添加任务: {request_data['taskname']}")
return jsonify(
{"success": True, "code": 0, "message": "任务添加成功", "data": request_data}
)
# 定时任务执行的函数
def run_python(args):
logging.info(f">>> 定时运行任务")
@ -290,11 +393,8 @@ def run_python(args):
# 重新加载任务
def reload_tasks():
# 读取数据
data = read_json()
# 添加新任务
crontab = data.get("crontab")
if crontab:
# 读取定时规则
if crontab := config_data.get("crontab"):
if scheduler.state == 1:
scheduler.pause() # 暂停调度器
trigger = CronTrigger.from_crontab(crontab)
@ -321,7 +421,7 @@ def reload_tasks():
def init():
global task_plugins_config
global config_data, task_plugins_config_default
logging.info(f">>> 初始化配置")
# 检查配置文件是否存在
if not os.path.exists(CONFIG_PATH):
@ -329,43 +429,30 @@ def init():
os.makedirs(os.path.dirname(CONFIG_PATH))
with open("quark_config.json", "rb") as src, open(CONFIG_PATH, "wb") as dest:
dest.write(src.read())
data = read_json()
Config.breaking_change_update(data)
# 读取配置
config_data = Config.read_json(CONFIG_PATH)
Config.breaking_change_update(config_data)
# 默认管理账号
data["webui"] = {
config_data["webui"] = {
"username": os.environ.get("WEBUI_USERNAME")
or data.get("webui", {}).get("username", "admin"),
or config_data.get("webui", {}).get("username", "admin"),
"password": os.environ.get("WEBUI_PASSWORD")
or data.get("webui", {}).get("password", "admin123"),
or config_data.get("webui", {}).get("password", "admin123"),
}
# 默认定时规则
if not data.get("crontab"):
data["crontab"] = "0 8,18,20 * * *"
if not config_data.get("crontab"):
config_data["crontab"] = "0 8,18,20 * * *"
# 初始化插件配置
_, plugins_config_default, task_plugins_config = Config.load_plugins()
plugins_config_default.update(data.get("plugins", {}))
data["plugins"] = plugins_config_default
write_json(data)
_, plugins_config_default, task_plugins_config_default = Config.load_plugins()
plugins_config_default.update(config_data.get("plugins", {}))
config_data["plugins"] = plugins_config_default
def filter_files(files, filterwords):
if not filterwords:
return files
filterwords_list = [word.strip() for word in filterwords.split(',')]
return [file for file in files if not any(word in file['file_name'] for word in filterwords_list)]
@app.route("/get_filtered_files")
def get_filtered_files():
if not is_login():
return jsonify({"error": "未登录"})
data = read_json()
filterwords = request.args.get("filterwords", "")
account = Quark(data["cookie"][0], 0)
fid = request.args.get("fid", 0)
files = account.ls_dir(fid)
filtered_files = filter_files(files, filterwords)
return jsonify(filtered_files)
# 更新配置
Config.write_json(CONFIG_PATH, config_data)
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@ import time
import random
import requests
import importlib
import urllib.parse
from datetime import datetime
# 兼容青龙
@ -33,8 +34,12 @@ GH_PROXY = os.environ.get("GH_PROXY", "https://ghproxy.net/")
MAGIC_REGEX = {
"$TV": {
"pattern": r".*?(?<!\d)([Ss]\d{1,2})?([Ee]?[Pp]?[Xx]?\d{1,3})(?!\d).*?\.(mp4|mkv)",
"replace": r"\1\2.\3",
"pattern": r".*?([Ss]\d{1,2})?(?:[第EePpXx\.\-\_\( ]{1,2}|^)(\d{1,3})(?!\d).*?\.(mp4|mkv)",
"replace": r"\1E\2.\3",
},
"$BLACK_WORD": {
"pattern": r"^(?!.*纯享)(?!.*加更)(?!.*超前企划)(?!.*训练室)(?!.*蒸蒸日上).*",
"replace": "",
},
}
@ -74,6 +79,17 @@ class Config:
else:
return False
# 读取 JSON 文件内容
def read_json(config_path):
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
return data
# 将数据写入 JSON 文件
def write_json(config_path, data):
with open(config_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, sort_keys=False, indent=2)
# 读取CK
def get_cookies(cookie_val):
if isinstance(cookie_val, list):
@ -215,40 +231,17 @@ class Quark:
}
)
del headers["cookie"]
# 添加重试机制
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
response = requests.request(method, url, headers=headers, timeout=30, **kwargs)
# 请求成功,返回结果
return response
except requests.exceptions.SSLError as e:
retry_count += 1
if retry_count >= max_retries:
print(f"SSL错误已重试{retry_count}次,放弃请求: {str(e)}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "SSL error", "code": 500}'
return fake_response
# 等待一段时间后重试
wait_time = retry_count * 2
print(f"SSL错误{wait_time}秒后进行第{retry_count+1}次重试: {str(e)}")
time.sleep(wait_time)
except requests.exceptions.RequestException as e:
retry_count += 1
if retry_count >= max_retries:
print(f"请求错误,已重试{retry_count}次,放弃请求: {str(e)}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "request error", "code": 500}'
return fake_response
# 等待一段时间后重试
wait_time = retry_count * 2
print(f"请求错误,{wait_time}秒后进行第{retry_count+1}次重试: {str(e)}")
time.sleep(wait_time)
try:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
return response
except Exception as e:
print(f"_send_request error:\n{e}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "request error"}'
return fake_response
def init(self):
account_info = self.get_account_info()
@ -344,6 +337,8 @@ class Quark:
"_sort": "file_type:asc,updated_at:desc",
}
response = self._send_request("GET", url, params=querystring).json()
if response["code"] != 0:
return {"error": response["message"]}
if response["data"]["list"]:
list_merge += response["data"]["list"]
page += 1
@ -391,6 +386,8 @@ class Quark:
"_fetch_full_path": kwargs.get("fetch_full_path", 0),
}
response = self._send_request("GET", url, params=querystring).json()
if response["code"] != 0:
return {"error": response["message"]}
if response["data"]["list"]:
file_list += response["data"]["list"]
page += 1
@ -524,8 +521,8 @@ class Quark:
# ↓ 操作函数
# 魔法正则匹配
def magic_regex_func(self, pattern, replace, taskname=None):
magic_regex = CONFIG_DATA.get("magic_regex") or MAGIC_REGEX or {}
def magic_regex_func(self, pattern, replace, taskname=None, magic_regex={}):
magic_regex = magic_regex or CONFIG_DATA.get("magic_regex") or MAGIC_REGEX
keyword = pattern
if keyword in magic_regex:
pattern = magic_regex[keyword]["pattern"]
@ -535,17 +532,34 @@ class Quark:
replace = replace.replace("$TASKNAME", taskname)
return pattern, replace
def get_id_from_url(self, url):
url = url.replace("https://pan.quark.cn/s/", "")
pattern = r"(\w+)(\?pwd=(\w+))?(#/list/share.*/(\w+))?"
match = re.search(pattern, url)
if match:
pwd_id = match.group(1)
passcode = match.group(3) if match.group(3) else ""
pdir_fid = match.group(5) if match.group(5) else 0
return pwd_id, passcode, pdir_fid
else:
return None
# def get_id_from_url(self, url):
# url = url.replace("https://pan.quark.cn/s/", "")
# pattern = r"(\w+)(\?pwd=(\w+))?(#/list/share.*/(\w+))?"
# match = re.search(pattern, url)
# if match:
# pwd_id = match.group(1)
# passcode = match.group(3) if match.group(3) else ""
# pdir_fid = match.group(5) if match.group(5) else 0
# return pwd_id, passcode, pdir_fid
# else:
# return None
def extract_url(self, url):
# pwd_id
match_id = re.search(r"/s/(\w+)", url)
pwd_id = match_id.group(1) if match_id else None
# passcode
match_pwd = re.search(r"pwd=(\w+)", url)
passcode = match_pwd.group(1) if match_pwd else ""
# path: fid-name
paths = []
matches = re.findall(r"/(\w{32})-?([^/]+)?", url)
for match in matches:
fid = match[0]
name = urllib.parse.unquote(match[1])
paths.append({"fid": fid, "name": name})
pdir_fid = paths[-1]["fid"] if matches else 0
return pwd_id, passcode, pdir_fid, paths
def update_savepath_fid(self, tasklist):
dir_paths = [
@ -580,8 +594,8 @@ class Quark:
def do_save_check(self, shareurl, savepath):
try:
pwd_id, passcode, pdir_fid = self.get_id_from_url(shareurl)
is_sharing, stoken = self.get_stoken(pwd_id, passcode)
pwd_id, passcode, pdir_fid, _ = self.extract_url(shareurl)
_, stoken = self.get_stoken(pwd_id, passcode)
share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)["list"]
fid_list = [item["fid"] for item in share_file_list]
fid_token_list = [item["share_fid_token"] for item in share_file_list]
@ -618,8 +632,7 @@ class Quark:
else:
return False
except Exception as e:
if os.environ.get("DEBUG") == True:
print(f"转存测试失败: {str(e)}")
print(f"转存测试失败: {str(e)}")
def do_save_task(self, task):
# 判断资源失效记录
@ -628,8 +641,7 @@ class Quark:
return
# 链接转换所需参数
pwd_id, passcode, pdir_fid = self.get_id_from_url(task["shareurl"])
# print("match: ", pwd_id, pdir_fid)
pwd_id, passcode, pdir_fid, _ = self.extract_url(task["shareurl"])
# 获取stoken同时可验证资源是否失效
is_sharing, stoken = self.get_stoken(pwd_id, passcode)
@ -667,10 +679,12 @@ class Quark:
share_file_list = self.get_detail(
pwd_id, stoken, share_file_list[0]["fid"]
)["list"]
# 应用过滤词过滤文件
# 应用过滤词过滤
if task.get("filterwords"):
filterwords_list = [word.strip() for word in task["filterwords"].split(',')]
# 同时支持中英文逗号分隔
filterwords = task["filterwords"].replace("", ",")
filterwords_list = [word.strip() for word in filterwords.split(',')]
share_file_list = [file for file in share_file_list if not any(word in file['file_name'] for word in filterwords_list)]
print(f"📑 应用过滤词:{task['filterwords']},剩余{len(share_file_list)}个文件")
@ -696,44 +710,13 @@ class Quark:
# 需保存的文件清单
need_save_list = []
# 顺序命名模式下获取当前序号和正则表达式
regex_pattern = None
if task.get("use_sequence_naming") and task.get("sequence_naming"):
# 获取目录中符合顺序命名格式的最大序号
sequence_pattern = task["sequence_naming"]
# 替换占位符为正则表达式捕获组
regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)')
max_sequence = 0
for dir_file in dir_file_list:
matches = re.match(regex_pattern, dir_file["file_name"])
if matches:
try:
current_seq = int(matches.group(1))
max_sequence = max(max_sequence, current_seq)
except (IndexError, ValueError):
pass
# 从最大序号开始计数
current_sequence = max_sequence
# 添加符合的
for share_file in share_file_list:
if share_file["dir"] and task.get("update_subdir", False):
pattern, replace = task["update_subdir"], ""
elif task.get("use_sequence_naming") and task.get("sequence_naming"):
# 使用顺序命名
pattern = ".*" # 匹配任何文件
# 序号暂时留空,等收集完所有文件后再按优先级排序赋值
replace = "TO_BE_REPLACED_LATER"
# 保留文件扩展名
if not share_file["dir"]:
file_ext = os.path.splitext(share_file["file_name"])[1]
replace = replace + file_ext
else:
pattern, replace = self.magic_regex_func(
task["pattern"], task["replace"], task["taskname"]
task.get("pattern", ""), task.get("replace", ""), task["taskname"]
)
# 正则文件名匹配
if re.search(pattern, share_file["file_name"]):
@ -751,51 +734,15 @@ class Quark:
)
else:
compare_func = lambda a, b1, b2: (a == b1 or a == b2)
# 判断目标目录文件是否存在
file_exists = False
# 顺序命名模式下增强去重功能
if task.get("use_sequence_naming") and task.get("sequence_naming") and not share_file["dir"]:
# 根据文件大小和修改时间判断文件是否已经存在
file_ext = os.path.splitext(share_file["file_name"])[1].lower()
for dir_file in dir_file_list:
# 检查是否为相同的文件(根据大小和扩展名判断)
dir_file_ext = os.path.splitext(dir_file["file_name"])[1].lower()
if (not dir_file["dir"] and
dir_file["size"] == share_file["size"] and
dir_file_ext == file_ext):
# 文件大小相同,扩展名相同,很可能是同一个文件
# 额外检查是否已经有符合顺序命名格式的文件(防止重复转存后重命名)
if re.match(regex_pattern, dir_file["file_name"]):
print(f"📌 顺序命名去重: {share_file['file_name']} 已存在于目录 {dir_file['file_name']},大小: {format_bytes(share_file['size'])},跳过")
file_exists = True
break
# 如果文件大小相同和扩展名相同,需要进一步检查修改时间是否接近
share_time = share_file.get("last_update_at", 0)
dir_time = dir_file.get("updated_at", 0)
# 如果修改时间在30天内或者差距不大认为是同一个文件
if abs(share_time - dir_time) < 2592000 or abs(1 - (share_time / dir_time if dir_time else 1)) < 0.1:
print(f"📌 顺序命名去重: {share_file['file_name']}{dir_file['file_name']} 匹配,大小: {format_bytes(share_file['size'])},跳过")
file_exists = True
break
else:
# 原有的文件名匹配判断
file_exists = any(
compare_func(
dir_file["file_name"], share_file["file_name"], save_name
)
for dir_file in dir_file_list
file_exists = any(
compare_func(
dir_file["file_name"], share_file["file_name"], save_name
)
for dir_file in dir_file_list
)
if not file_exists:
share_file["save_name"] = save_name
share_file["original_name"] = share_file["file_name"] # 保存原文件名,用于排序
need_save_list.append(share_file)
elif share_file["dir"]:
# 存在并是一个文件夹
@ -824,71 +771,6 @@ class Quark:
if share_file["fid"] == task.get("startfid", ""):
break
# 如果是顺序命名模式,需要重新排序并生成文件名
if task.get("use_sequence_naming") and task.get("sequence_naming") and need_save_list:
def custom_sort(file):
file_name = file["original_name"]
# 1. 提取文件名中的数字(期数/集数等)
episode_num = 0
# 尝试匹配"第X期/集/话"格式
episode_match = re.search(r'第(\d+)[期集话]', file_name)
if episode_match:
episode_num = int(episode_match.group(1))
# 尝试匹配常见视频格式 S01E01, E01, 1x01 等
elif re.search(r'[Ss](\d+)[Ee](\d+)', file_name):
match = re.search(r'[Ss](\d+)[Ee](\d+)', file_name)
season = int(match.group(1))
episode = int(match.group(2))
episode_num = season * 1000 + episode # 确保季和集的排序正确
elif re.search(r'[Ee](\d+)', file_name):
match = re.search(r'[Ee](\d+)', file_name)
episode_num = int(match.group(1))
elif re.search(r'(\d+)[xX](\d+)', file_name):
match = re.search(r'(\d+)[xX](\d+)', file_name)
season = int(match.group(1))
episode = int(match.group(2))
episode_num = season * 1000 + episode
# 尝试匹配日期格式 YYYYMMDD
elif re.search(r'(\d{4})(\d{2})(\d{2})', file_name):
match = re.search(r'(\d{4})(\d{2})(\d{2})', file_name)
year = int(match.group(1))
month = int(match.group(2))
day = int(match.group(3))
episode_num = year * 10000 + month * 100 + day
# 尝试匹配纯数字格式(文件名开头是纯数字)
elif re.search(r'^(\d+)', file_name):
match = re.search(r'^(\d+)', file_name)
episode_num = int(match.group(1))
# 2. 检查文件名中是否包含"上中下"等排序提示
position_order = 10 # 默认顺序值
if '' in file_name:
position_order = 1
elif '' in file_name:
position_order = 2
elif '' in file_name:
position_order = 3
# 3. 返回排序元组:先按集数排序,再按上中下,最后按更新时间
return (episode_num, position_order, file["last_update_at"] if "last_update_at" in file else 0)
# 按自定义逻辑排序
need_save_list = sorted(need_save_list, key=custom_sort)
# 重新生成命名
for index, file in enumerate(need_save_list):
current_sequence += 1
if file["dir"]:
file["save_name"] = sequence_pattern.replace("{}", f"{current_sequence:02d}")
else:
file_ext = os.path.splitext(file["file_name"])[1]
file["save_name"] = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext
fid_list = [item["fid"] for item in need_save_list]
fid_token_list = [item["share_fid_token"] for item in need_save_list]
if fid_list:
@ -926,144 +808,40 @@ class Quark:
return tree
def do_rename_task(self, task, subdir_path=""):
if task.get("use_sequence_naming") and task.get("sequence_naming"):
# 使用顺序命名模式
sequence_pattern = task["sequence_naming"]
# 替换占位符为正则表达式捕获组
regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)')
savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")
if not self.savepath_fid.get(savepath):
self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"]
dir_file_list = self.ls_dir(self.savepath_fid[savepath])
dir_file_name_list = [item["file_name"] for item in dir_file_list]
# 找出当前最大序号
max_sequence = 0
for dir_file in dir_file_list:
matches = re.match(regex_pattern, dir_file["file_name"])
if matches:
try:
current_seq = int(matches.group(1))
max_sequence = max(max_sequence, current_seq)
except (IndexError, ValueError):
pass
# 重命名文件
current_sequence = max_sequence
is_rename_count = 0
# 定义一个排序函数,支持多种格式的排序
def custom_sort(file):
file_name = file["file_name"]
# 1. 提取文件名中的数字(期数/集数等)
episode_num = 0
# 尝试匹配"第X期/集/话"格式
episode_match = re.search(r'第(\d+)[期集话]', file_name)
if episode_match:
episode_num = int(episode_match.group(1))
# 尝试匹配常见视频格式 S01E01, E01, 1x01 等
elif re.search(r'[Ss](\d+)[Ee](\d+)', file_name):
match = re.search(r'[Ss](\d+)[Ee](\d+)', file_name)
season = int(match.group(1))
episode = int(match.group(2))
episode_num = season * 1000 + episode # 确保季和集的排序正确
elif re.search(r'[Ee](\d+)', file_name):
match = re.search(r'[Ee](\d+)', file_name)
episode_num = int(match.group(1))
elif re.search(r'(\d+)[xX](\d+)', file_name):
match = re.search(r'(\d+)[xX](\d+)', file_name)
season = int(match.group(1))
episode = int(match.group(2))
episode_num = season * 1000 + episode
# 尝试匹配日期格式 YYYYMMDD
elif re.search(r'(\d{4})(\d{2})(\d{2})', file_name):
match = re.search(r'(\d{4})(\d{2})(\d{2})', file_name)
year = int(match.group(1))
month = int(match.group(2))
day = int(match.group(3))
episode_num = year * 10000 + month * 100 + day
# 尝试匹配纯数字格式(文件名开头是纯数字)
elif re.search(r'^(\d+)', file_name):
match = re.search(r'^(\d+)', file_name)
episode_num = int(match.group(1))
# 2. 检查文件名中是否包含"上中下"等排序提示
position_order = 10 # 默认顺序值
if '' in file_name:
position_order = 1
elif '' in file_name:
position_order = 2
elif '' in file_name:
position_order = 3
# 3. 返回排序元组:先按集数排序,再按上中下,最后按创建时间
return (episode_num, position_order, file["created_at"])
# 按自定义逻辑排序
sorted_files = sorted([f for f in dir_file_list if not f["dir"] and not re.match(regex_pattern, f["file_name"])], key=custom_sort)
for dir_file in sorted_files:
current_sequence += 1
file_ext = os.path.splitext(dir_file["file_name"])[1]
save_name = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext
if save_name != dir_file["file_name"] and save_name not in dir_file_name_list:
try:
rename_return = self.rename(dir_file["fid"], save_name)
# 防止网络问题导致的错误
if isinstance(rename_return, dict) and rename_return.get("code") == 0:
print(f"重命名:{dir_file['file_name']}{save_name}")
is_rename_count += 1
dir_file_name_list.append(save_name)
else:
error_msg = rename_return.get("message", "未知错误")
print(f"重命名:{dir_file['file_name']}{save_name} 失败,{error_msg}")
except Exception as e:
print(f"重命名出错:{dir_file['file_name']}{save_name},错误:{str(e)}")
return is_rename_count > 0
else:
# 原有的正则匹配模式
pattern, replace = self.magic_regex_func(
task["pattern"], task["replace"], task["taskname"]
)
if not pattern or not replace:
return 0
savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")
if not self.savepath_fid.get(savepath):
self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"]
dir_file_list = self.ls_dir(self.savepath_fid[savepath])
dir_file_name_list = [item["file_name"] for item in dir_file_list]
is_rename_count = 0
for dir_file in dir_file_list:
if dir_file["dir"]:
is_rename_count += self.do_rename_task(
task, f"{subdir_path}/{dir_file['file_name']}"
)
if re.search(pattern, dir_file["file_name"]):
save_name = (
re.sub(pattern, replace, dir_file["file_name"])
if replace != ""
else dir_file["file_name"]
)
if save_name != dir_file["file_name"] and (
save_name not in dir_file_name_list
):
try:
rename_return = self.rename(dir_file["fid"], save_name)
if isinstance(rename_return, dict) and rename_return.get("code") == 0:
print(f"重命名:{dir_file['file_name']}{save_name}")
is_rename_count += 1
else:
error_msg = rename_return.get("message", "未知错误")
print(f"重命名:{dir_file['file_name']}{save_name} 失败,{error_msg}")
except Exception as e:
print(f"重命名出错:{dir_file['file_name']}{save_name},错误:{str(e)}")
return is_rename_count > 0
pattern, replace = self.magic_regex_func(
task.get("pattern", ""), task.get("replace", ""), task["taskname"]
)
if not pattern or not replace:
return 0
savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")
if not self.savepath_fid.get(savepath):
self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"]
dir_file_list = self.ls_dir(self.savepath_fid[savepath])
dir_file_name_list = [item["file_name"] for item in dir_file_list]
is_rename_count = 0
for dir_file in dir_file_list:
if dir_file["dir"]:
is_rename_count += self.do_rename_task(
task, f"{subdir_path}/{dir_file['file_name']}"
)
if re.search(pattern, dir_file["file_name"]):
save_name = (
re.sub(pattern, replace, dir_file["file_name"])
if replace != ""
else dir_file["file_name"]
)
if save_name != dir_file["file_name"] and (
save_name not in dir_file_name_list
):
rename_return = self.rename(dir_file["fid"], save_name)
if rename_return["code"] == 0:
print(f"重命名:{dir_file['file_name']}{save_name}")
is_rename_count += 1
else:
print(
f"重命名:{dir_file['file_name']}{save_name} 失败,{rename_return['message']}"
)
return is_rename_count > 0
def verify_account(account):
@ -1134,7 +912,7 @@ def do_save(account, tasklist=[]):
# 获取全部保存目录fid
account.update_savepath_fid(tasklist)
def check_date(task):
def is_time(task):
return (
not task.get("enddate")
or (
@ -1142,35 +920,33 @@ def do_save(account, tasklist=[]):
<= datetime.strptime(task["enddate"], "%Y-%m-%d").date()
)
) and (
not task.get("runweek")
"runweek" not in task
# 星期一为0星期日为6
or (datetime.today().weekday() + 1 in task.get("runweek"))
)
# 执行任务
for index, task in enumerate(tasklist):
# 判断任务期限
if check_date(task):
print()
print(f"#{index+1}------------------")
print(f"任务名称: {task['taskname']}")
print(f"分享链接: {task['shareurl']}")
print(f"保存路径: {task['savepath']}")
# 打印重命名规则信息
if task.get("use_sequence_naming") and task.get("sequence_naming"):
print(f"顺序命名: {task['sequence_naming']}")
else:
print(f"正则匹配: {task['pattern']}")
print(f"正则替换: {task['replace']}")
if task.get("enddate"):
print(f"任务截止: {task['enddate']}")
if task.get("ignore_extension"):
print(f"忽略后缀: {task['ignore_extension']}")
if task.get("update_subdir"):
print(f"更子目录: {task['update_subdir']}")
print()
print()
print(f"#{index+1}------------------")
print(f"任务名称: {task['taskname']}")
print(f"分享链接: {task['shareurl']}")
print(f"保存路径: {task['savepath']}")
if task.get("pattern"):
print(f"正则匹配: {task['pattern']}")
if task.get("replace"):
print(f"正则替换: {task['replace']}")
if task.get("update_subdir"):
print(f"更子目录: {task['update_subdir']}")
if task.get("runweek") or task.get("enddate"):
print(
f"运行周期: WK{task.get("runweek",[])} ~ {task.get('enddate','forever')}"
)
print()
# 判断任务周期
if not is_time(task):
print(f"任务不在运行周期内,跳过")
else:
is_new_tree = account.do_save_task(task)
is_rename = account.do_rename_task(task)
@ -1210,7 +986,13 @@ def main():
print()
# 读取启动参数
config_path = sys.argv[1] if len(sys.argv) > 1 else "quark_config.json"
task_index = int(sys.argv[2]) if len(sys.argv) > 2 and sys.argv[2].isdigit() else ""
# 从环境变量中获取 TASKLIST
tasklist_from_env = []
if tasklist_json := os.environ.get("TASKLIST"):
try:
tasklist_from_env = json.loads(tasklist_json)
except Exception as e:
print(f"从环境变量解析任务列表失败 {e}")
# 检查本地文件是否存在,如果不存在就下载
if not os.path.exists(config_path):
if os.environ.get("QUARK_COOKIE"):
@ -1227,9 +1009,8 @@ def main():
return
else:
print(f"⚙️ 正从 {config_path} 文件中读取配置")
with open(config_path, "r", encoding="utf-8") as file:
CONFIG_DATA = json.load(file)
Config.breaking_change_update(CONFIG_DATA)
CONFIG_DATA = Config.read_json(config_path)
Config.breaking_change_update(CONFIG_DATA)
cookie_val = CONFIG_DATA.get("cookie")
if not CONFIG_DATA.get("magic_regex"):
CONFIG_DATA["magic_regex"] = MAGIC_REGEX
@ -1242,7 +1023,7 @@ def main():
accounts = [Quark(cookie, index) for index, cookie in enumerate(cookies)]
# 签到
print(f"===============签到任务===============")
if type(task_index) is int:
if tasklist_from_env:
verify_account(accounts[0])
else:
for account in accounts:
@ -1253,11 +1034,10 @@ def main():
if accounts[0].is_active and cookie_form_file:
print(f"===============转存任务===============")
# 任务列表
tasklist = CONFIG_DATA.get("tasklist", [])
if type(task_index) is int:
do_save(accounts[0], [tasklist[task_index]])
if tasklist_from_env:
do_save(accounts[0], tasklist_from_env)
else:
do_save(accounts[0], tasklist)
do_save(accounts[0], CONFIG_DATA.get("tasklist", []))
print()
# 通知
if NOTIFYS:
@ -1267,8 +1047,7 @@ def main():
print()
if cookie_form_file:
# 更新配置
with open(config_path, "w", encoding="utf-8") as file:
json.dump(CONFIG_DATA, file, ensure_ascii=False, sort_keys=False, indent=2)
Config.write_json(config_path, CONFIG_DATA)
print(f"===============程序结束===============")
duration = datetime.now() - start_time