mirror of
https://github.com/Cp0204/quark-auto-save.git
synced 2026-01-12 15:20:44 +08:00
968 lines
36 KiB
Python
968 lines
36 KiB
Python
# !/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
from flask import (
|
||
json,
|
||
Flask,
|
||
url_for,
|
||
session,
|
||
jsonify,
|
||
request,
|
||
redirect,
|
||
Response,
|
||
render_template,
|
||
send_from_directory,
|
||
stream_with_context,
|
||
)
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
from sdk.cloudsaver import CloudSaver
|
||
from datetime import timedelta, datetime
|
||
import subprocess
|
||
import requests
|
||
import hashlib
|
||
import logging
|
||
import base64
|
||
import sys
|
||
import os
|
||
import re
|
||
import random
|
||
import time
|
||
|
||
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||
sys.path.insert(0, parent_dir)
|
||
from quark_auto_save import Quark
|
||
from quark_auto_save import Config, format_bytes
|
||
|
||
# 添加导入全局extract_episode_number和sort_file_by_name函数
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from quark_auto_save import extract_episode_number, sort_file_by_name
|
||
|
||
# 导入数据库模块
|
||
try:
|
||
from app.sdk.db import RecordDB
|
||
except ImportError:
|
||
# 如果没有数据库模块,定义一个空类
|
||
class RecordDB:
|
||
def __init__(self, *args, **kwargs):
|
||
pass
|
||
|
||
def get_records(self, *args, **kwargs):
|
||
return {"records": [], "pagination": {"total_records": 0, "total_pages": 0, "current_page": 1, "page_size": 20}}
|
||
|
||
|
||
def get_app_ver():
|
||
BUILD_SHA = os.environ.get("BUILD_SHA", "")
|
||
BUILD_TAG = os.environ.get("BUILD_TAG", "")
|
||
if BUILD_TAG[:1] == "v":
|
||
return BUILD_TAG
|
||
elif BUILD_SHA:
|
||
return f"{BUILD_TAG}({BUILD_SHA[:7]})"
|
||
else:
|
||
return "dev"
|
||
|
||
|
||
# 文件路径
|
||
PYTHON_PATH = "python3" if os.path.exists("/usr/bin/python3") else "python"
|
||
SCRIPT_PATH = os.environ.get("SCRIPT_PATH", "./quark_auto_save.py")
|
||
CONFIG_PATH = os.environ.get("CONFIG_PATH", "./config/quark_config.json")
|
||
PLUGIN_FLAGS = os.environ.get("PLUGIN_FLAGS", "")
|
||
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
|
||
# 从环境变量获取端口,默认为5005
|
||
PORT = int(os.environ.get("PORT", "5005"))
|
||
|
||
config_data = {}
|
||
task_plugins_config_default = {}
|
||
|
||
app = Flask(__name__)
|
||
app.config["APP_VERSION"] = get_app_ver()
|
||
app.secret_key = "ca943f6db6dd34823d36ab08d8d6f65d"
|
||
app.config["SESSION_COOKIE_NAME"] = "QUARK_AUTO_SAVE_SESSION"
|
||
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(days=31)
|
||
app.json.ensure_ascii = False
|
||
app.json.sort_keys = False
|
||
app.jinja_env.variable_start_string = "[["
|
||
app.jinja_env.variable_end_string = "]]"
|
||
|
||
scheduler = BackgroundScheduler()
|
||
logging.basicConfig(
|
||
level=logging.DEBUG if DEBUG else logging.INFO,
|
||
format="[%(asctime)s][%(levelname)s] %(message)s",
|
||
datefmt="%m-%d %H:%M:%S",
|
||
)
|
||
# 过滤werkzeug日志输出
|
||
if not DEBUG:
|
||
logging.getLogger("werkzeug").setLevel(logging.ERROR)
|
||
|
||
|
||
def gen_md5(string):
|
||
md5 = hashlib.md5()
|
||
md5.update(string.encode("utf-8"))
|
||
return md5.hexdigest()
|
||
|
||
|
||
def get_login_token():
|
||
username = config_data["webui"]["username"]
|
||
password = config_data["webui"]["password"]
|
||
return gen_md5(f"token{username}{password}+-*/")[8:24]
|
||
|
||
|
||
def is_login():
|
||
login_token = get_login_token()
|
||
if session.get("token") == login_token or request.args.get("token") == login_token:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
|
||
# 设置icon
|
||
@app.route("/favicon.ico")
|
||
def favicon():
|
||
return send_from_directory(
|
||
os.path.join(app.root_path, "static"),
|
||
"favicon.ico",
|
||
mimetype="image/vnd.microsoft.icon",
|
||
)
|
||
|
||
|
||
# 登录页面
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "POST":
|
||
username = config_data["webui"]["username"]
|
||
password = config_data["webui"]["password"]
|
||
input_username = request.form.get("username")
|
||
input_password = request.form.get("password")
|
||
|
||
# 验证用户名和密码
|
||
if not input_username or not input_password:
|
||
logging.info(">>> 登录失败:用户名或密码为空")
|
||
return render_template("login.html", message="用户名和密码不能为空")
|
||
elif username != input_username:
|
||
logging.info(f">>> 登录失败:用户名错误 {input_username}")
|
||
return render_template("login.html", message="用户名或密码错误")
|
||
elif password != input_password:
|
||
logging.info(f">>> 用户 {input_username} 登录失败:密码错误")
|
||
return render_template("login.html", message="用户名或密码错误")
|
||
else:
|
||
logging.info(f">>> 用户 {username} 登录成功")
|
||
session.permanent = True
|
||
session["token"] = get_login_token()
|
||
return redirect(url_for("index"))
|
||
|
||
if is_login():
|
||
return redirect(url_for("index"))
|
||
return render_template("login.html", error=None)
|
||
|
||
|
||
# 退出登录
|
||
@app.route("/logout")
|
||
def logout():
|
||
session.pop("token", None)
|
||
return redirect(url_for("login"))
|
||
|
||
|
||
# 管理页面
|
||
@app.route("/")
|
||
def index():
|
||
if not is_login():
|
||
return redirect(url_for("login"))
|
||
return render_template(
|
||
"index.html", version=app.config["APP_VERSION"], plugin_flags=PLUGIN_FLAGS
|
||
)
|
||
|
||
|
||
# 获取配置数据
|
||
@app.route("/data")
|
||
def get_data():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
data = Config.read_json(CONFIG_PATH)
|
||
# 发送webui信息,但不发送密码原文
|
||
data["webui"] = {
|
||
"username": config_data["webui"]["username"],
|
||
"password": config_data["webui"]["password"]
|
||
}
|
||
data["api_token"] = get_login_token()
|
||
data["task_plugins_config_default"] = task_plugins_config_default
|
||
return jsonify({"success": True, "data": data})
|
||
|
||
|
||
# 更新数据
|
||
@app.route("/update", methods=["POST"])
|
||
def update():
|
||
global config_data
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
dont_save_keys = ["task_plugins_config_default", "api_token"]
|
||
for key, value in request.json.items():
|
||
if key not in dont_save_keys:
|
||
if key == "webui":
|
||
# 更新webui凭据
|
||
config_data["webui"]["username"] = value.get("username", config_data["webui"]["username"])
|
||
config_data["webui"]["password"] = value.get("password", config_data["webui"]["password"])
|
||
else:
|
||
config_data.update({key: value})
|
||
Config.write_json(CONFIG_PATH, config_data)
|
||
# 更新session token,确保当前会话在用户名密码更改后仍然有效
|
||
session["token"] = get_login_token()
|
||
# 重新加载任务
|
||
if reload_tasks():
|
||
logging.info(f">>> 配置更新成功")
|
||
return jsonify({"success": True, "message": "配置更新成功"})
|
||
else:
|
||
logging.info(f">>> 配置更新失败")
|
||
return jsonify({"success": False, "message": "配置更新失败"})
|
||
|
||
|
||
# 处理运行脚本请求
|
||
@app.route("/run_script_now", methods=["POST"])
|
||
def run_script_now():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
tasklist = request.json.get("tasklist", [])
|
||
command = [PYTHON_PATH, "-u", SCRIPT_PATH, CONFIG_PATH]
|
||
logging.info(
|
||
f">>> 手动运行任务 [{tasklist[0].get('taskname') if len(tasklist)>0 else 'ALL'}] 开始执行..."
|
||
)
|
||
|
||
def generate_output():
|
||
# 设置环境变量
|
||
process_env = os.environ.copy()
|
||
process_env["PYTHONIOENCODING"] = "utf-8"
|
||
if tasklist:
|
||
process_env["TASKLIST"] = json.dumps(tasklist, ensure_ascii=False)
|
||
# 添加原始任务索引的环境变量
|
||
if len(tasklist) == 1 and 'original_index' in request.json:
|
||
process_env["ORIGINAL_TASK_INDEX"] = str(request.json['original_index'])
|
||
process = subprocess.Popen(
|
||
command,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
universal_newlines=True,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
bufsize=1,
|
||
env=process_env,
|
||
)
|
||
try:
|
||
for line in iter(process.stdout.readline, ""):
|
||
logging.info(line.strip())
|
||
yield f"data: {line}\n\n"
|
||
yield "data: [DONE]\n\n"
|
||
finally:
|
||
process.stdout.close()
|
||
process.wait()
|
||
|
||
return Response(
|
||
stream_with_context(generate_output()),
|
||
content_type="text/event-stream;charset=utf-8",
|
||
)
|
||
|
||
|
||
@app.route("/task_suggestions")
|
||
def get_task_suggestions():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
query = request.args.get("q", "").lower()
|
||
deep = request.args.get("d", "").lower()
|
||
|
||
# 提取剧名,去除季数信息
|
||
def extract_show_name(task_name):
|
||
# 清理任务名称中的连续空格和特殊符号
|
||
clean_name = task_name.replace('\u3000', ' ').replace('\t', ' ')
|
||
clean_name = re.sub(r'\s+', ' ', clean_name).strip()
|
||
|
||
# 匹配常见的季数格式
|
||
# 例如:黑镜 - S07、人生若如初见 - S01、折腰.S01、音你而来-S02、快乐的大人 S02
|
||
season_patterns = [
|
||
r'^(.*?)[\s\.\-_]+S\d+$', # 黑镜 - S07、折腰.S01、音你而来-S02
|
||
r'^(.*?)[\s\.\-_]+Season\s*\d+$', # 黑镜 - Season 1
|
||
r'^(.*?)\s+S\d+$', # 快乐的大人 S02
|
||
r'^(.*?)[\s\.\-_]+S\d+E\d+$', # 处理 S01E01 格式
|
||
r'^(.*?)\s+第\s*\d+\s*季$', # 处理 第N季 格式
|
||
r'^(.*?)[\s\.\-_]+第\s*\d+\s*季$', # 处理 - 第N季 格式
|
||
r'^(.*?)\s+第[一二三四五六七八九十零]+季$', # 处理 第一季、第二季 格式
|
||
r'^(.*?)[\s\.\-_]+第[一二三四五六七八九十零]+季$', # 处理 - 第一季、- 第二季 格式
|
||
]
|
||
|
||
for pattern in season_patterns:
|
||
match = re.match(pattern, clean_name, re.IGNORECASE)
|
||
if match:
|
||
show_name = match.group(1).strip()
|
||
# 去除末尾可能残留的分隔符
|
||
show_name = re.sub(r'[\s\.\-_]+$', '', show_name)
|
||
return show_name
|
||
|
||
# 如果没有匹配到季数格式,返回原名称
|
||
return clean_name
|
||
|
||
# 处理搜索关键词,提取剧名
|
||
search_query = extract_show_name(query)
|
||
|
||
try:
|
||
cs_data = config_data.get("source", {}).get("cloudsaver", {})
|
||
if (
|
||
cs_data.get("server")
|
||
and cs_data.get("username")
|
||
and cs_data.get("password")
|
||
):
|
||
cs = CloudSaver(cs_data.get("server"))
|
||
cs.set_auth(
|
||
cs_data.get("username", ""),
|
||
cs_data.get("password", ""),
|
||
cs_data.get("token", ""),
|
||
)
|
||
# 使用处理后的搜索关键词
|
||
search = cs.auto_login_search(search_query)
|
||
if search.get("success"):
|
||
if search.get("new_token"):
|
||
cs_data["token"] = search.get("new_token")
|
||
Config.write_json(CONFIG_PATH, config_data)
|
||
search_results = cs.clean_search_results(search.get("data"))
|
||
# 在返回结果中添加实际使用的搜索关键词
|
||
return jsonify(
|
||
{
|
||
"success": True,
|
||
"source": "CloudSaver",
|
||
"data": search_results
|
||
}
|
||
)
|
||
else:
|
||
return jsonify({"success": True, "message": search.get("message")})
|
||
else:
|
||
base_url = base64.b64decode("aHR0cHM6Ly9zLjkxNzc4OC54eXo=").decode()
|
||
# 使用处理后的搜索关键词
|
||
url = f"{base_url}/task_suggestions?q={search_query}&d={deep}"
|
||
response = requests.get(url)
|
||
return jsonify(
|
||
{
|
||
"success": True,
|
||
"source": "网络公开",
|
||
"data": response.json()
|
||
}
|
||
)
|
||
except Exception as e:
|
||
return jsonify({"success": True, "message": f"error: {str(e)}"})
|
||
|
||
|
||
# 添加函数,与主程序保持一致
|
||
def is_date_format(number_str):
|
||
"""
|
||
判断一个纯数字字符串是否可能是日期格式
|
||
支持的格式:YYYYMMDD, MMDD
|
||
"""
|
||
# 判断YYYYMMDD格式 (8位数字)
|
||
if len(number_str) == 8 and number_str.startswith('20'):
|
||
year = int(number_str[:4])
|
||
month = int(number_str[4:6])
|
||
day = int(number_str[6:8])
|
||
|
||
# 简单检查月份和日期是否有效
|
||
if 1 <= month <= 12 and 1 <= day <= 31:
|
||
# 可能是日期格式
|
||
return True
|
||
|
||
# 判断MMDD格式 (4位数字)
|
||
elif len(number_str) == 4:
|
||
month = int(number_str[:2])
|
||
day = int(number_str[2:4])
|
||
|
||
# 简单检查月份和日期是否有效
|
||
if 1 <= month <= 12 and 1 <= day <= 31:
|
||
# 可能是日期格式
|
||
return True
|
||
|
||
# 其他长度的纯数字不视为日期格式
|
||
return False
|
||
|
||
|
||
# 获取分享详情接口
|
||
@app.route("/get_share_detail", methods=["GET", "POST"])
|
||
def get_share_detail():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
|
||
# 支持GET和POST请求
|
||
if request.method == "GET":
|
||
shareurl = request.args.get("shareurl", "")
|
||
stoken = request.args.get("stoken", "")
|
||
else:
|
||
shareurl = request.json.get("shareurl", "")
|
||
stoken = request.json.get("stoken", "")
|
||
|
||
account = Quark("", 0)
|
||
# 设置account的必要属性
|
||
account.episode_patterns = request.json.get("regex", {}).get("episode_patterns", []) if request.method == "POST" else []
|
||
|
||
pwd_id, passcode, pdir_fid, paths = account.extract_url(shareurl)
|
||
if not stoken:
|
||
is_sharing, stoken = account.get_stoken(pwd_id, passcode)
|
||
if not is_sharing:
|
||
return jsonify({"success": False, "data": {"error": stoken}})
|
||
share_detail = account.get_detail(pwd_id, stoken, pdir_fid, _fetch_share=1)
|
||
share_detail["paths"] = paths
|
||
share_detail["stoken"] = stoken
|
||
|
||
# 如果是GET请求或者不需要预览正则,直接返回分享详情
|
||
if request.method == "GET" or not request.json.get("regex"):
|
||
return jsonify({"success": True, "data": share_detail})
|
||
|
||
# 正则命名预览
|
||
def preview_regex(share_detail):
|
||
regex = request.json.get("regex")
|
||
# 检查是否为顺序命名模式
|
||
if regex.get("use_sequence_naming") and regex.get("sequence_naming"):
|
||
# 顺序命名模式预览
|
||
sequence_pattern = regex.get("sequence_naming")
|
||
current_sequence = 1
|
||
|
||
# 构建顺序命名的正则表达式
|
||
if sequence_pattern == "{}":
|
||
# 对于单独的{},使用特殊匹配
|
||
regex_pattern = "(\\d+)"
|
||
else:
|
||
regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)')
|
||
|
||
# 实现与实际重命名相同的排序算法
|
||
def extract_sort_value(file):
|
||
return sort_file_by_name(file)
|
||
|
||
# 过滤出非目录文件,并且排除已经符合命名规则的文件
|
||
files_to_process = []
|
||
for f in share_detail["list"]:
|
||
if f["dir"]:
|
||
continue # 跳过目录
|
||
|
||
# 检查文件是否已符合命名规则
|
||
if sequence_pattern == "{}":
|
||
# 对于单独的{},检查文件名是否为纯数字
|
||
file_name_without_ext = os.path.splitext(f["file_name"])[0]
|
||
if file_name_without_ext.isdigit():
|
||
# 增加判断:如果是日期格式的纯数字,不视为已命名
|
||
if not is_date_format(file_name_without_ext):
|
||
continue # 跳过已符合命名规则的文件
|
||
elif re.match(regex_pattern, f["file_name"]):
|
||
continue # 跳过已符合命名规则的文件
|
||
|
||
# 添加到待处理文件列表
|
||
files_to_process.append(f)
|
||
|
||
# 根据提取的排序值进行排序
|
||
sorted_files = sorted(files_to_process, key=extract_sort_value)
|
||
|
||
# 应用过滤词过滤
|
||
filterwords = regex.get("filterwords", "")
|
||
if filterwords:
|
||
# 同时支持中英文逗号分隔
|
||
filterwords = filterwords.replace(",", ",")
|
||
filterwords_list = [word.strip() for word in filterwords.split(',')]
|
||
for item in sorted_files:
|
||
# 被过滤的文件不会有file_name_re,与不匹配正则的文件显示一致
|
||
if any(word in item['file_name'] for word in filterwords_list):
|
||
item["filtered"] = True
|
||
|
||
# 为每个文件分配序号
|
||
for file in sorted_files:
|
||
if not file.get("filtered"):
|
||
# 获取文件扩展名
|
||
file_ext = os.path.splitext(file["file_name"])[1]
|
||
# 生成预览文件名
|
||
if sequence_pattern == "{}":
|
||
# 对于单独的{},直接使用数字序号作为文件名
|
||
file["file_name_re"] = f"{current_sequence:02d}{file_ext}"
|
||
else:
|
||
# 替换所有的{}为当前序号
|
||
file["file_name_re"] = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext
|
||
current_sequence += 1
|
||
|
||
return share_detail
|
||
elif regex.get("use_episode_naming") and regex.get("episode_naming"):
|
||
# 剧集命名模式预览
|
||
episode_pattern = regex.get("episode_naming")
|
||
episode_patterns = regex.get("episode_patterns", [])
|
||
|
||
# 调用全局的集编号提取函数
|
||
def extract_episode_number_local(filename):
|
||
return extract_episode_number(filename, episode_patterns=episode_patterns)
|
||
|
||
# 构建剧集命名的正则表达式 (主要用于检测已命名文件)
|
||
if episode_pattern == "[]":
|
||
# 对于单独的[],使用特殊匹配
|
||
regex_pattern = "^(\\d+)$" # 匹配纯数字文件名
|
||
elif "[]" in episode_pattern:
|
||
# 特殊处理E[]、EP[]等常见格式,使用更宽松的匹配方式
|
||
if episode_pattern == "E[]":
|
||
# 对于E[]格式,只检查文件名中是否包含形如E01的部分
|
||
regex_pattern = "^E(\\d+)$" # 只匹配纯E+数字的文件名格式
|
||
elif episode_pattern == "EP[]":
|
||
# 对于EP[]格式,只检查文件名中是否包含形如EP01的部分
|
||
regex_pattern = "^EP(\\d+)$" # 只匹配纯EP+数字的文件名格式
|
||
else:
|
||
# 对于其他带[]的格式,使用常规转义和替换
|
||
regex_pattern = re.escape(episode_pattern).replace('\\[\\]', '(\\d+)')
|
||
else:
|
||
# 如果输入模式不包含[],则使用简单匹配模式,避免正则表达式错误
|
||
regex_pattern = "^" + re.escape(episode_pattern) + "(\\d+)$"
|
||
|
||
# 实现高级排序算法
|
||
def extract_sorting_value(file):
|
||
if file["dir"]: # 跳过文件夹
|
||
return (float('inf'), 0, 0, 0) # 返回元组以确保类型一致性
|
||
|
||
filename = file["file_name"]
|
||
|
||
# 尝试获取剧集序号
|
||
episode_num = extract_episode_number_local(filename)
|
||
if episode_num is not None:
|
||
# 返回元组以确保类型一致性
|
||
return (0, episode_num, 0, 0)
|
||
|
||
# 如果无法提取剧集号,则使用通用的排序函数
|
||
return sort_file_by_name(file)
|
||
|
||
# 过滤出非目录文件,并且排除已经符合命名规则的文件
|
||
files_to_process = []
|
||
for f in share_detail["list"]:
|
||
if f["dir"]:
|
||
continue # 跳过目录
|
||
|
||
# 检查文件是否已符合命名规则
|
||
if episode_pattern == "[]":
|
||
# 对于单独的[],检查文件名是否为纯数字
|
||
file_name_without_ext = os.path.splitext(f["file_name"])[0]
|
||
if file_name_without_ext.isdigit():
|
||
# 增加判断:如果是日期格式的纯数字,不视为已命名
|
||
if not is_date_format(file_name_without_ext):
|
||
continue # 跳过已符合命名规则的文件
|
||
elif re.match(regex_pattern, f["file_name"]):
|
||
continue # 跳过已符合命名规则的文件
|
||
|
||
# 添加到待处理文件列表
|
||
files_to_process.append(f)
|
||
|
||
# 根据提取的排序值进行排序
|
||
sorted_files = sorted(files_to_process, key=extract_sorting_value)
|
||
|
||
# 应用过滤词过滤
|
||
filterwords = regex.get("filterwords", "")
|
||
if filterwords:
|
||
# 同时支持中英文逗号分隔
|
||
filterwords = filterwords.replace(",", ",")
|
||
filterwords_list = [word.strip() for word in filterwords.split(',')]
|
||
for item in sorted_files:
|
||
# 被过滤的文件不会有file_name_re,与不匹配正则的文件显示一致
|
||
if any(word in item['file_name'] for word in filterwords_list):
|
||
item["filtered"] = True
|
||
|
||
# 为每个文件生成新文件名
|
||
for file in sorted_files:
|
||
if not file.get("filtered"):
|
||
# 获取文件扩展名
|
||
file_ext = os.path.splitext(file["file_name"])[1]
|
||
# 尝试提取剧集号
|
||
episode_num = extract_episode_number_local(file["file_name"])
|
||
if episode_num is not None:
|
||
# 生成预览文件名
|
||
if episode_pattern == "[]":
|
||
# 对于单独的[],直接使用数字序号作为文件名
|
||
file["file_name_re"] = f"{episode_num:02d}{file_ext}"
|
||
else:
|
||
file["file_name_re"] = episode_pattern.replace("[]", f"{episode_num:02d}") + file_ext
|
||
else:
|
||
# 无法提取剧集号,标记为无法处理
|
||
file["file_name_re"] = "❌ 无法识别剧集号"
|
||
|
||
return share_detail
|
||
else:
|
||
# 普通正则命名预览
|
||
pattern, replace = account.magic_regex_func(
|
||
regex.get("pattern", ""),
|
||
regex.get("replace", ""),
|
||
regex.get("taskname", ""),
|
||
regex.get("magic_regex", {}),
|
||
)
|
||
|
||
# 应用过滤词过滤
|
||
filterwords = regex.get("filterwords", "")
|
||
if filterwords:
|
||
# 同时支持中英文逗号分隔
|
||
filterwords = filterwords.replace(",", ",")
|
||
filterwords_list = [word.strip() for word in filterwords.split(',')]
|
||
for item in share_detail["list"]:
|
||
# 被过滤的文件不会有file_name_re,与不匹配正则的文件显示一致
|
||
if any(word in item['file_name'] for word in filterwords_list):
|
||
item["filtered"] = True
|
||
|
||
# 应用正则命名
|
||
for item in share_detail["list"]:
|
||
# 只对未被过滤的文件应用正则命名
|
||
if not item.get("filtered") and re.search(pattern, item["file_name"]):
|
||
file_name = item["file_name"]
|
||
item["file_name_re"] = (
|
||
re.sub(pattern, replace, file_name) if replace != "" else file_name
|
||
)
|
||
return share_detail
|
||
|
||
share_detail = preview_regex(share_detail)
|
||
|
||
return jsonify({"success": True, "data": share_detail})
|
||
|
||
|
||
@app.route("/get_savepath_detail")
|
||
def get_savepath_detail():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
account = Quark(config_data["cookie"][0], 0)
|
||
paths = []
|
||
if path := request.args.get("path"):
|
||
if path == "/":
|
||
fid = 0
|
||
else:
|
||
dir_names = path.split("/")
|
||
if dir_names[0] == "":
|
||
dir_names.pop(0)
|
||
path_fids = []
|
||
current_path = ""
|
||
for dir_name in dir_names:
|
||
current_path += "/" + dir_name
|
||
path_fids.append(current_path)
|
||
if get_fids := account.get_fids(path_fids):
|
||
fid = get_fids[-1]["fid"]
|
||
paths = [
|
||
{"fid": get_fid["fid"], "name": dir_name}
|
||
for get_fid, dir_name in zip(get_fids, dir_names)
|
||
]
|
||
else:
|
||
return jsonify({"success": False, "data": {"error": "获取fid失败"}})
|
||
else:
|
||
fid = request.args.get("fid", "0")
|
||
file_list = {
|
||
"list": account.ls_dir(fid),
|
||
"paths": paths,
|
||
}
|
||
return jsonify({"success": True, "data": file_list})
|
||
|
||
|
||
@app.route("/delete_file", methods=["POST"])
|
||
def delete_file():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
account = Quark(config_data["cookie"][0], 0)
|
||
if fid := request.json.get("fid"):
|
||
response = account.delete([fid])
|
||
else:
|
||
response = {"success": False, "message": "缺失必要字段: fid"}
|
||
return jsonify(response)
|
||
|
||
|
||
# 添加任务接口
|
||
@app.route("/api/add_task", methods=["POST"])
|
||
def add_task():
|
||
global config_data
|
||
# 验证token
|
||
if not is_login():
|
||
return jsonify({"success": False, "code": 1, "message": "未登录"}), 401
|
||
# 必选字段
|
||
request_data = request.json
|
||
required_fields = ["taskname", "shareurl", "savepath"]
|
||
for field in required_fields:
|
||
if field not in request_data or not request_data[field]:
|
||
return (
|
||
jsonify(
|
||
{"success": False, "code": 2, "message": f"缺少必要字段: {field}"}
|
||
),
|
||
400,
|
||
)
|
||
# 添加任务
|
||
config_data["tasklist"].append(request_data)
|
||
Config.write_json(CONFIG_PATH, config_data)
|
||
logging.info(f">>> 通过API添加任务: {request_data['taskname']}")
|
||
return jsonify(
|
||
{"success": True, "code": 0, "message": "任务添加成功", "data": request_data}
|
||
)
|
||
|
||
|
||
# 定时任务执行的函数
|
||
def run_python(args):
|
||
logging.info(f">>> 定时运行任务")
|
||
# 检查是否需要随机延迟执行
|
||
if delay := config_data.get("crontab_delay"):
|
||
try:
|
||
delay_seconds = int(delay)
|
||
if delay_seconds > 0:
|
||
# 在0到设定值之间随机选择一个延迟时间
|
||
random_delay = random.randint(0, delay_seconds)
|
||
logging.info(f">>> 随机延迟执行 {random_delay}秒")
|
||
time.sleep(random_delay)
|
||
except (ValueError, TypeError):
|
||
logging.warning(f">>> 延迟执行设置无效: {delay}")
|
||
|
||
os.system(f"{PYTHON_PATH} {args}")
|
||
|
||
|
||
# 重新加载任务
|
||
def reload_tasks():
|
||
# 读取定时规则
|
||
if crontab := config_data.get("crontab"):
|
||
if scheduler.state == 1:
|
||
scheduler.pause() # 暂停调度器
|
||
trigger = CronTrigger.from_crontab(crontab)
|
||
scheduler.remove_all_jobs()
|
||
scheduler.add_job(
|
||
run_python,
|
||
trigger=trigger,
|
||
args=[f"{SCRIPT_PATH} {CONFIG_PATH}"],
|
||
id=SCRIPT_PATH,
|
||
)
|
||
if scheduler.state == 0:
|
||
scheduler.start()
|
||
elif scheduler.state == 2:
|
||
scheduler.resume()
|
||
scheduler_state_map = {0: "停止", 1: "运行", 2: "暂停"}
|
||
logging.info(">>> 重载调度器")
|
||
logging.info(f"调度状态: {scheduler_state_map[scheduler.state]}")
|
||
logging.info(f"定时规则: {crontab}")
|
||
# 记录延迟执行设置
|
||
if delay := config_data.get("crontab_delay"):
|
||
logging.info(f"延迟执行: 0-{delay}秒")
|
||
logging.info(f"现有任务: {scheduler.get_jobs()}")
|
||
return True
|
||
else:
|
||
logging.info(">>> no crontab")
|
||
return False
|
||
|
||
|
||
def init():
|
||
global config_data, task_plugins_config_default
|
||
logging.info(f">>> 初始化配置")
|
||
# 检查配置文件是否存在
|
||
if not os.path.exists(CONFIG_PATH):
|
||
if not os.path.exists(os.path.dirname(CONFIG_PATH)):
|
||
os.makedirs(os.path.dirname(CONFIG_PATH))
|
||
with open("quark_config.json", "rb") as src, open(CONFIG_PATH, "wb") as dest:
|
||
dest.write(src.read())
|
||
|
||
# 读取配置
|
||
config_data = Config.read_json(CONFIG_PATH)
|
||
Config.breaking_change_update(config_data)
|
||
|
||
# 默认管理账号
|
||
config_data["webui"] = {
|
||
"username": os.environ.get("WEBUI_USERNAME")
|
||
or config_data.get("webui", {}).get("username", "admin"),
|
||
"password": os.environ.get("WEBUI_PASSWORD")
|
||
or config_data.get("webui", {}).get("password", "admin"),
|
||
}
|
||
|
||
# 默认定时规则
|
||
if not config_data.get("crontab"):
|
||
config_data["crontab"] = "0 8,18,20 * * *"
|
||
|
||
# 默认延迟执行设置
|
||
if "crontab_delay" not in config_data:
|
||
config_data["crontab_delay"] = 0
|
||
|
||
# 初始化插件配置
|
||
_, plugins_config_default, task_plugins_config_default = Config.load_plugins()
|
||
plugins_config_default.update(config_data.get("plugins", {}))
|
||
config_data["plugins"] = plugins_config_default
|
||
|
||
# 更新配置
|
||
Config.write_json(CONFIG_PATH, config_data)
|
||
|
||
|
||
# 获取历史转存记录
|
||
@app.route("/history_records")
|
||
def get_history_records():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
|
||
# 获取请求参数
|
||
page = int(request.args.get("page", 1))
|
||
page_size = int(request.args.get("page_size", 20))
|
||
sort_by = request.args.get("sort_by", "transfer_time")
|
||
order = request.args.get("order", "desc")
|
||
|
||
# 获取筛选参数
|
||
task_name_filter = request.args.get("task_name", "")
|
||
keyword_filter = request.args.get("keyword", "")
|
||
|
||
# 是否只请求所有任务名称
|
||
get_all_task_names = request.args.get("get_all_task_names", "").lower() in ["true", "1", "yes"]
|
||
|
||
# 初始化数据库
|
||
db = RecordDB()
|
||
|
||
# 如果请求所有任务名称,单独查询并返回
|
||
if get_all_task_names:
|
||
cursor = db.conn.cursor()
|
||
cursor.execute("SELECT DISTINCT task_name FROM transfer_records ORDER BY task_name")
|
||
all_task_names = [row[0] for row in cursor.fetchall()]
|
||
|
||
# 如果同时请求分页数据,继续常规查询
|
||
if page > 0 and page_size > 0:
|
||
result = db.get_records(
|
||
page=page,
|
||
page_size=page_size,
|
||
sort_by=sort_by,
|
||
order=order,
|
||
task_name_filter=task_name_filter,
|
||
keyword_filter=keyword_filter
|
||
)
|
||
# 添加所有任务名称到结果中
|
||
result["all_task_names"] = all_task_names
|
||
|
||
# 处理记录格式化
|
||
format_records(result["records"])
|
||
|
||
return jsonify({"success": True, "data": result})
|
||
else:
|
||
# 只返回任务名称
|
||
return jsonify({"success": True, "data": {"all_task_names": all_task_names}})
|
||
|
||
# 常规查询
|
||
result = db.get_records(
|
||
page=page,
|
||
page_size=page_size,
|
||
sort_by=sort_by,
|
||
order=order,
|
||
task_name_filter=task_name_filter,
|
||
keyword_filter=keyword_filter
|
||
)
|
||
|
||
# 处理记录格式化
|
||
format_records(result["records"])
|
||
|
||
return jsonify({"success": True, "data": result})
|
||
|
||
|
||
# 删除转存记录
|
||
@app.route("/delete_history_records", methods=["POST"])
|
||
def delete_history_records():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
|
||
# 获取要删除的记录ID列表
|
||
record_ids = request.json.get("record_ids", [])
|
||
|
||
if not record_ids:
|
||
return jsonify({"success": False, "message": "未提供要删除的记录ID"})
|
||
|
||
# 初始化数据库
|
||
db = RecordDB()
|
||
|
||
# 删除记录
|
||
deleted_count = 0
|
||
for record_id in record_ids:
|
||
deleted_count += db.delete_record(record_id)
|
||
|
||
return jsonify({
|
||
"success": True,
|
||
"message": f"成功删除 {deleted_count} 条记录",
|
||
"deleted_count": deleted_count
|
||
})
|
||
|
||
|
||
# 删除单条转存记录
|
||
@app.route("/delete_history_record", methods=["POST"])
|
||
def delete_history_record():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
|
||
# 获取要删除的记录ID
|
||
record_id = request.json.get("id")
|
||
|
||
if not record_id:
|
||
return jsonify({"success": False, "message": "未提供要删除的记录ID"})
|
||
|
||
# 初始化数据库
|
||
db = RecordDB()
|
||
|
||
# 删除记录
|
||
deleted = db.delete_record(record_id)
|
||
|
||
if deleted:
|
||
return jsonify({
|
||
"success": True,
|
||
"message": "成功删除 1 条记录",
|
||
})
|
||
else:
|
||
return jsonify({
|
||
"success": False,
|
||
"message": "记录删除失败,可能记录不存在",
|
||
})
|
||
|
||
|
||
# 辅助函数:格式化记录
|
||
def format_records(records):
|
||
for record in records:
|
||
# 格式化时间戳为可读形式
|
||
if "transfer_time" in record:
|
||
try:
|
||
# 确保时间戳在合理范围内
|
||
timestamp = int(record["transfer_time"])
|
||
if timestamp > 9999999999: # 检测是否为毫秒级时间戳(13位)
|
||
timestamp = timestamp / 1000 # 转换为秒级时间戳
|
||
|
||
if 0 < timestamp < 4102444800: # 从1970年到2100年的合理时间戳范围
|
||
record["transfer_time_readable"] = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||
else:
|
||
record["transfer_time_readable"] = "无效日期"
|
||
except (ValueError, TypeError, OverflowError):
|
||
record["transfer_time_readable"] = "无效日期"
|
||
|
||
if "modify_date" in record:
|
||
try:
|
||
# 确保时间戳在合理范围内
|
||
timestamp = int(record["modify_date"])
|
||
if timestamp > 9999999999: # 检测是否为毫秒级时间戳(13位)
|
||
timestamp = timestamp / 1000 # 转换为秒级时间戳
|
||
|
||
if 0 < timestamp < 4102444800: # 从1970年到2100年的合理时间戳范围
|
||
record["modify_date_readable"] = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||
else:
|
||
record["modify_date_readable"] = "无效日期"
|
||
except (ValueError, TypeError, OverflowError):
|
||
record["modify_date_readable"] = "无效日期"
|
||
|
||
# 格式化文件大小
|
||
if "file_size" in record:
|
||
try:
|
||
record["file_size_readable"] = format_bytes(int(record["file_size"]))
|
||
except (ValueError, TypeError):
|
||
record["file_size_readable"] = "未知大小"
|
||
|
||
|
||
@app.route("/get_user_info")
|
||
def get_user_info():
|
||
if not is_login():
|
||
return jsonify({"success": False, "message": "未登录"})
|
||
|
||
user_info_list = []
|
||
for idx, cookie in enumerate(config_data["cookie"]):
|
||
account = Quark(cookie, idx)
|
||
account_info = account.init()
|
||
if account_info:
|
||
user_info_list.append({
|
||
"index": idx,
|
||
"nickname": account_info["nickname"],
|
||
"is_active": account.is_active
|
||
})
|
||
else:
|
||
# 检查是否有移动端参数
|
||
has_mparam = bool(account.mparam)
|
||
user_info_list.append({
|
||
"index": idx,
|
||
"nickname": "",
|
||
"is_active": False,
|
||
"has_mparam": has_mparam
|
||
})
|
||
|
||
return jsonify({"success": True, "data": user_info_list})
|
||
|
||
|
||
if __name__ == "__main__":
|
||
init()
|
||
reload_tasks()
|
||
app.run(debug=DEBUG, host="0.0.0.0", port=PORT)
|