quark-auto-save/app/run.py
2025-05-07 00:52:46 +08:00

486 lines
16 KiB
Python

# !/usr/bin/env python3
# -*- coding: utf-8 -*-
from flask import (
json,
Flask,
url_for,
session,
jsonify,
request,
redirect,
Response,
render_template,
send_from_directory,
stream_with_context,
)
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from sdk.cloudsaver import CloudSaver
from datetime import timedelta
import subprocess
import requests
import hashlib
import logging
import base64
import sys
import os
import re
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, parent_dir)
from quark_auto_save import Quark, Config, MagicRename
def get_app_ver():
BUILD_SHA = os.environ.get("BUILD_SHA", "")
BUILD_TAG = os.environ.get("BUILD_TAG", "")
if BUILD_TAG[:1] == "v":
return BUILD_TAG
elif BUILD_SHA:
return f"{BUILD_TAG}({BUILD_SHA[:7]})"
else:
return "dev"
# 文件路径
PYTHON_PATH = "python3" if os.path.exists("/usr/bin/python3") else "python"
SCRIPT_PATH = os.environ.get("SCRIPT_PATH", "./quark_auto_save.py")
CONFIG_PATH = os.environ.get("CONFIG_PATH", "./config/quark_config.json")
PLUGIN_FLAGS = os.environ.get("PLUGIN_FLAGS", "")
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
config_data = {}
task_plugins_config_default = {}
app = Flask(__name__)
app.config["APP_VERSION"] = get_app_ver()
app.secret_key = "ca943f6db6dd34823d36ab08d8d6f65d"
app.config["SESSION_COOKIE_NAME"] = "QUARK_AUTO_SAVE_SESSION"
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(days=31)
app.json.ensure_ascii = False
app.json.sort_keys = False
app.jinja_env.variable_start_string = "[["
app.jinja_env.variable_end_string = "]]"
scheduler = BackgroundScheduler()
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
format="[%(asctime)s][%(levelname)s] %(message)s",
datefmt="%m-%d %H:%M:%S",
)
# 过滤werkzeug日志输出
if not DEBUG:
logging.getLogger("werkzeug").setLevel(logging.ERROR)
def gen_md5(string):
md5 = hashlib.md5()
md5.update(string.encode("utf-8"))
return md5.hexdigest()
def get_login_token():
username = config_data["webui"]["username"]
password = config_data["webui"]["password"]
return gen_md5(f"token{username}{password}+-*/")[8:24]
def is_login():
login_token = get_login_token()
if session.get("token") == login_token or request.args.get("token") == login_token:
return True
else:
return False
# 设置icon
@app.route("/favicon.ico")
def favicon():
return send_from_directory(
os.path.join(app.root_path, "static"),
"favicon.ico",
mimetype="image/vnd.microsoft.icon",
)
# 登录页面
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = config_data["webui"]["username"]
password = config_data["webui"]["password"]
# 验证用户名和密码
if (username == request.form.get("username")) and (
password == request.form.get("password")
):
logging.info(f">>> 用户 {username} 登录成功")
session.permanent = True
session["token"] = get_login_token()
return redirect(url_for("index"))
else:
logging.info(f">>> 用户 {username} 登录失败")
return render_template("login.html", message="登录失败")
if is_login():
return redirect(url_for("index"))
return render_template("login.html", error=None)
# 退出登录
@app.route("/logout")
def logout():
session.pop("token", None)
return redirect(url_for("login"))
# 管理页面
@app.route("/")
def index():
if not is_login():
return redirect(url_for("login"))
return render_template(
"index.html", version=app.config["APP_VERSION"], plugin_flags=PLUGIN_FLAGS
)
# 获取配置数据
@app.route("/data")
def get_data():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
data = Config.read_json(CONFIG_PATH)
del data["webui"]
data["api_token"] = get_login_token()
data["task_plugins_config_default"] = task_plugins_config_default
return jsonify({"success": True, "data": data})
# 更新数据
@app.route("/update", methods=["POST"])
def update():
global config_data
if not is_login():
return jsonify({"success": False, "message": "未登录"})
dont_save_keys = ["task_plugins_config_default", "api_token"]
for key, value in request.json.items():
if key not in dont_save_keys:
config_data.update({key: value})
Config.write_json(CONFIG_PATH, config_data)
# 重新加载任务
if reload_tasks():
logging.info(f">>> 配置更新成功")
return jsonify({"success": True, "message": "配置更新成功"})
else:
logging.info(f">>> 配置更新失败")
return jsonify({"success": False, "message": "配置更新失败"})
# 处理运行脚本请求
@app.route("/run_script_now", methods=["POST"])
def run_script_now():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
tasklist = request.json.get("tasklist", [])
command = [PYTHON_PATH, "-u", SCRIPT_PATH, CONFIG_PATH]
logging.info(
f">>> 手动运行任务 [{tasklist[0].get('taskname') if len(tasklist)>0 else 'ALL'}] 开始执行..."
)
def generate_output():
# 设置环境变量
process_env = os.environ.copy()
process_env["PYTHONIOENCODING"] = "utf-8"
if tasklist:
process_env["TASKLIST"] = json.dumps(tasklist, ensure_ascii=False)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding="utf-8",
errors="replace",
bufsize=1,
env=process_env,
)
try:
for line in iter(process.stdout.readline, ""):
logging.info(line.strip())
yield f"data: {line}\n\n"
yield "data: [DONE]\n\n"
finally:
process.stdout.close()
process.wait()
return Response(
stream_with_context(generate_output()),
content_type="text/event-stream;charset=utf-8",
)
@app.route("/task_suggestions")
def get_task_suggestions():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
query = request.args.get("q", "").lower()
deep = request.args.get("d", "").lower()
try:
cs_data = config_data.get("source", {}).get("cloudsaver", {})
if (
cs_data.get("server")
and cs_data.get("username")
and cs_data.get("password")
):
cs = CloudSaver(cs_data.get("server"))
cs.set_auth(
cs_data.get("username", ""),
cs_data.get("password", ""),
cs_data.get("token", ""),
)
search = cs.auto_login_search(query)
if search.get("success"):
if search.get("new_token"):
cs_data["token"] = search.get("new_token")
Config.write_json(CONFIG_PATH, config_data)
search_results = cs.clean_search_results(search.get("data"))
return jsonify(
{"success": True, "source": "CloudSaver", "data": search_results}
)
else:
return jsonify({"success": True, "message": search.get("message")})
else:
base_url = base64.b64decode("aHR0cHM6Ly9zLjkxNzc4OC54eXo=").decode()
url = f"{base_url}/task_suggestions?q={query}&d={deep}"
response = requests.get(url)
return jsonify(
{"success": True, "source": "网络公开", "data": response.json()}
)
except Exception as e:
return jsonify({"success": True, "message": f"error: {str(e)}"})
@app.route("/get_share_detail", methods=["POST"])
def get_share_detail():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
shareurl = request.json.get("shareurl", "")
stoken = request.json.get("stoken", "")
account = Quark("", 0)
pwd_id, passcode, pdir_fid, paths = account.extract_url(shareurl)
if not stoken:
get_stoken = account.get_stoken(pwd_id, passcode)
if get_stoken.get("status") == 200:
stoken = get_stoken["data"]["stoken"]
else:
return jsonify(
{"success": False, "data": {"error": get_stoken.get("message")}}
)
share_detail = account.get_detail(pwd_id, stoken, pdir_fid, _fetch_share=1)
if share_detail.get("code") != 0:
return jsonify(
{"success": False, "data": {"error": share_detail.get("message")}}
)
data = share_detail["data"]
data["paths"] = paths
data["stoken"] = stoken
# 正则处理预览
def preview_regex(data):
task = request.json.get("task", {})
magic_regex = request.json.get("magic_regex", {})
mr = MagicRename(magic_regex)
mr.set_taskname(task.get("taskname", ""))
account = Quark(config_data["cookie"][0], 0)
get_fids = account.get_fids([task.get("savepath", "")])
if get_fids:
dir_file_list = account.ls_dir(get_fids[0]["fid"])["data"]["list"]
dir_filename_list = [dir_file["file_name"] for dir_file in dir_file_list]
else:
dir_file_list = []
dir_filename_list = []
for share_file in data["list"]:
if share_file["dir"] and task.get("update_subdir", False):
pattern, replace = task["update_subdir"], ""
else:
pattern, replace = mr.magic_regex_conv(
task.get("pattern", ""), task.get("replace", "")
)
if re.search(pattern, share_file["file_name"]):
# 文件名重命名,目录不重命名
file_name_re = (
share_file["file_name"]
if share_file["dir"]
else mr.sub(pattern, replace, share_file["file_name"])
)
if file_name_saved := mr.is_exists(
file_name_re,
dir_filename_list,
(task.get("ignore_extension") and not share_file["dir"]),
):
share_file["file_name_saved"] = file_name_saved
else:
share_file["file_name_re"] = file_name_re
# 文件列表排序
if re.search(r"\{I+\}", replace):
mr.set_dir_file_list(dir_file_list, replace)
mr.sort_file_list(data["list"])
if request.json.get("task"):
preview_regex(data)
return jsonify({"success": True, "data": data})
@app.route("/get_savepath_detail")
def get_savepath_detail():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
account = Quark(config_data["cookie"][0], 0)
paths = []
if path := request.args.get("path"):
path = re.sub(r"/+", "/", path)
if path == "/":
fid = 0
else:
dir_names = path.split("/")
if dir_names[0] == "":
dir_names.pop(0)
path_fids = []
current_path = ""
for dir_name in dir_names:
current_path += "/" + dir_name
path_fids.append(current_path)
if get_fids := account.get_fids(path_fids):
fid = get_fids[-1]["fid"]
paths = [
{"fid": get_fid["fid"], "name": dir_name}
for get_fid, dir_name in zip(get_fids, dir_names)
]
else:
return jsonify({"success": False, "data": {"error": "获取fid失败"}})
else:
fid = request.args.get("fid", "0")
file_list = {
"list": account.ls_dir(fid)["data"]["list"],
"paths": paths,
}
return jsonify({"success": True, "data": file_list})
@app.route("/delete_file", methods=["POST"])
def delete_file():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
account = Quark(config_data["cookie"][0], 0)
if fid := request.json.get("fid"):
response = account.delete([fid])
else:
response = {"success": False, "message": "缺失必要字段: fid"}
return jsonify(response)
# 添加任务接口
@app.route("/api/add_task", methods=["POST"])
def add_task():
global config_data
# 验证token
if not is_login():
return jsonify({"success": False, "code": 1, "message": "未登录"}), 401
# 必选字段
request_data = request.json
required_fields = ["taskname", "shareurl", "savepath"]
for field in required_fields:
if field not in request_data or not request_data[field]:
return (
jsonify(
{"success": False, "code": 2, "message": f"缺少必要字段: {field}"}
),
400,
)
# 添加任务
config_data["tasklist"].append(request_data)
Config.write_json(CONFIG_PATH, config_data)
logging.info(f">>> 通过API添加任务: {request_data['taskname']}")
return jsonify(
{"success": True, "code": 0, "message": "任务添加成功", "data": request_data}
)
# 定时任务执行的函数
def run_python(args):
logging.info(f">>> 定时运行任务")
os.system(f"{PYTHON_PATH} {args}")
# 重新加载任务
def reload_tasks():
# 读取定时规则
if crontab := config_data.get("crontab"):
if scheduler.state == 1:
scheduler.pause() # 暂停调度器
trigger = CronTrigger.from_crontab(crontab)
scheduler.remove_all_jobs()
scheduler.add_job(
run_python,
trigger=trigger,
args=[f"{SCRIPT_PATH} {CONFIG_PATH}"],
id=SCRIPT_PATH,
)
if scheduler.state == 0:
scheduler.start()
elif scheduler.state == 2:
scheduler.resume()
scheduler_state_map = {0: "停止", 1: "运行", 2: "暂停"}
logging.info(">>> 重载调度器")
logging.info(f"调度状态: {scheduler_state_map[scheduler.state]}")
logging.info(f"定时规则: {crontab}")
logging.info(f"现有任务: {scheduler.get_jobs()}")
return True
else:
logging.info(">>> no crontab")
return False
def init():
global config_data, task_plugins_config_default
logging.info(f">>> 初始化配置")
# 检查配置文件是否存在
if not os.path.exists(CONFIG_PATH):
if not os.path.exists(os.path.dirname(CONFIG_PATH)):
os.makedirs(os.path.dirname(CONFIG_PATH))
with open("quark_config.json", "rb") as src, open(CONFIG_PATH, "wb") as dest:
dest.write(src.read())
# 读取配置
config_data = Config.read_json(CONFIG_PATH)
Config.breaking_change_update(config_data)
# 默认管理账号
config_data["webui"] = {
"username": os.environ.get("WEBUI_USERNAME")
or config_data.get("webui", {}).get("username", "admin"),
"password": os.environ.get("WEBUI_PASSWORD")
or config_data.get("webui", {}).get("password", "admin123"),
}
# 默认定时规则
if not config_data.get("crontab"):
config_data["crontab"] = "0 8,18,20 * * *"
# 初始化插件配置
_, plugins_config_default, task_plugins_config_default = Config.load_plugins()
plugins_config_default.update(config_data.get("plugins", {}))
config_data["plugins"] = plugins_config_default
# 更新配置
Config.write_json(CONFIG_PATH, config_data)
if __name__ == "__main__":
init()
reload_tasks()
app.run(debug=DEBUG, host="0.0.0.0", port=5005)