Compare commits

..

No commits in common. "main" and "v0.3.9.3" have entirely different histories.

30 changed files with 2905 additions and 3970 deletions

View File

@ -1,12 +1,6 @@
# 使用官方 Python 镜像作为基础镜像
FROM python:3.13-alpine
#构建版本
ARG BUILD_SHA
ARG BUILD_TAG
ENV BUILD_SHA=$BUILD_SHA
ENV BUILD_TAG=$BUILD_TAG
# 设置工作目录
WORKDIR /app
@ -14,12 +8,17 @@ WORKDIR /app
COPY . /app
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt && \
echo "{\"BUILD_SHA\":\"$BUILD_SHA\", \"BUILD_TAG\":\"$BUILD_TAG\"}" > build.json
RUN pip install --no-cache-dir -r requirements.txt
# 时区
ENV TZ="Asia/Shanghai"
#构建版本
ARG BUILD_SHA
ARG BUILD_TAG
ENV BUILD_SHA=$BUILD_SHA
ENV BUILD_TAG=$BUILD_TAG
# 端口
EXPOSE 5005

110
README.md
View File

@ -8,7 +8,7 @@
对于一些持续更新的资源,隔段时间去转存十分麻烦。
定期执行本脚本自动转存、文件名整理,配合 [SmartStrm](https://github.com/Cp0204/SmartStrm) / [OpenList](https://github.com/OpenListTeam/OpenList) , Emby 可达到自动追更的效果。🥳
定期执行本脚本自动转存、文件名整理,配合 Alist, rclone, Emby 可达到自动追更的效果。🥳
[![wiki][wiki-image]][wiki-url] [![github releases][gitHub-releases-image]][github-url] [![docker pulls][docker-pulls-image]][docker-url] [![docker image size][docker-image-size-image]][docker-url]
@ -29,19 +29,18 @@
> ⛔️⛔️⛔️ 注意!资源不会每时每刻更新,**严禁设定过高的定时运行频率!** 以免账号风控和给夸克服务器造成不必要的压力。雪山崩塌,每一片雪花都有责任!
> [!NOTE]
> 开发者≠客服,开源免费≠帮你解决使用问题;本项目 Wiki 已经相对完善,遇到问题请先翻阅 Issues 和 Wiki ,请勿盲目发问。
> 因不想当客服处理各种使用咨询,即日起 Issues 关闭,如果你发现了 bug 、有好的想法或功能建议,欢迎通过 PR 和我对话,谢谢!
## 功能
- 部署方式
- [x] 可能~~兼容青龙~~
- [x] Docker 部署WebUI 配置
- [x] 兼容青龙
- [x] 支持 Docker 独立部署WebUI 配置
- 分享链接
- [x] 支持分享链接的子目录
- [x] 记录失效分享并跳过任务
- [x] 支持需提取码的分享链接 <sup>[?](https://github.com/Cp0204/quark-auto-save/wiki/使用技巧集锦#支持需提取码的分享链接)</sup>
- [x] 智能搜索资源并自动填充 <sup>[?](https://github.com/Cp0204/quark-auto-save/wiki/CloudSaver搜索源)</sup>
- 文件管理
- [x] 目标目录不存在时自动新建
@ -58,7 +57,7 @@
- 媒体库整合
- [x] 根据任务名搜索 Emby 媒体库
- [x] 追更或整理后自动刷新 Emby 媒体库
- [x] 插件模块化,允许自行开发和挂载[插件](./plugins)
- [x] **媒体库模块化,用户可很方便地[开发自己的媒体库hook模块](./plugins)**
- 其它
- [x] 每日签到领空间 <sup>[?](https://github.com/Cp0204/quark-auto-save/wiki/使用技巧集锦#每日签到领空间)</sup>
@ -69,15 +68,15 @@
### Docker 部署
Docker 部署提供 WebUI 进行管理配置,部署命令:
Docker 部署提供 WebUI 管理配置,图形化配置已能满足绝大多数需求。部署命令:
```shell
docker run -d \
--name quark-auto-save \
-p 5005:5005 \ # 映射端口,:前的可以改,即部署后访问的端口,:后的不可改
-p 5005:5005 \
-e WEBUI_USERNAME=admin \
-e WEBUI_PASSWORD=admin123 \
-v ./quark-auto-save/config:/app/config \ # 必须,配置持久化
-v ./quark-auto-save/config:/app/config \
-v ./quark-auto-save/media:/media \ # 可选模块alist_strm_gen生成strm使用
--network bridge \
--restart unless-stopped \
@ -107,13 +106,10 @@ services:
管理地址http://yourhost:5005
| 环境变量 | 默认 | 备注 |
| ---------------- | ---------- | ---------------------------------------- |
| `WEBUI_USERNAME` | `admin` | 管理账号 |
| `WEBUI_PASSWORD` | `admin123` | 管理密码 |
| `PORT` | `5005` | 管理后台端口 |
| `PLUGIN_FLAGS` | | 插件标志,如 `-emby,-aria2` 禁用某些插件 |
| `TASK_TIMEOUT` | `1800` | 任务执行超时时间(秒),超时则任务结束 |
| 环境变量 | 默认 | 备注 |
| ---------------- | ---------- | -------- |
| `WEBUI_USERNAME` | `admin` | 管理账号 |
| `WEBUI_PASSWORD` | `admin123` | 管理密码 |
#### 一键更新
@ -130,28 +126,32 @@ docker run --rm -v /var/run/docker.sock:/var/run/docker.sock containrrr/watchtow
</details>
### 青龙部署
程序也支持以青龙定时任务的方式运行,但该方式无法使用 WebUI 管理任务,需手动修改配置文件。
青龙部署说明已转移到 Wiki [青龙部署教程](https://github.com/Cp0204/quark-auto-save/wiki/部署教程#青龙部署)
## 使用说明
### 正则处理示例
### 正则理示例
| pattern | replace | 效果 |
| -------------------------------------- | ----------------------- | ---------------------------------------------------------------------- |
| `.*` | | 无脑转存所有文件,不整理 |
| `\.mp4$` | | 转存所有 `.mp4` 后缀的文件 |
| `^【电影TT】花好月圆(\d+)\.(mp4\|mkv)` | `\1.\2` | 【电影TT】花好月圆01.mp4 → 01.mp4<br>【电影TT】花好月圆02.mkv → 02.mkv |
| `^(\d+)\.mp4` | `S02E\1.mp4` | 01.mp4 → S02E01.mp4<br>02.mp4 → S02E02.mp4 |
| `$TV` | | [魔法匹配](#魔法匹配)剧集文件 |
| `^(\d+)\.mp4` | `{TASKNAME}.S02E\1.mp4` | 01.mp4 → 任务名.S02E01.mp4 |
更多正则使用说明:[正则处理教程](https://github.com/Cp0204/quark-auto-save/wiki/正则处理教程)
| pattern | replace | 效果 |
| -------------------------------------- | ------------ | ---------------------------------------------------------------------- |
| `.*` | | 无脑转存所有文件,不整理 |
| `\.mp4$` | | 转存所有 `.mp4` 后缀的文件 |
| `^【电影TT】花好月圆(\d+)\.(mp4\|mkv)` | `\1.\2` | 【电影TT】花好月圆01.mp4 → 01.mp4<br>【电影TT】花好月圆02.mkv → 02.mkv |
| `^(\d+)\.mp4` | `S02E\1.mp4` | 01.mp4 → S02E01.mp4<br>02.mp4 → S02E02.mp4 |
| `$TV` | | [魔法匹配](#魔法匹配)剧集文件 |
| `^(\d+)\.mp4` | `$TASKNAME.S02E\1.mp4` | 01.mp4 → 任务名.S02E01.mp4 |
> [!TIP]
>
> **魔法匹配和魔法变量**:在正则处理中,我们定义了一些“魔法匹配”模式,如果 表达式 的值以 $ 开头且 替换式 留空,程序将自动使用预设的正则表达式进行匹配和替换
> **魔法匹配**:当任务 `pattern` 值为 `$开头``replace` 留空时,实际将调用程序预设的正则表达式
>
> 自 v0.6.0 开始,支持更多以 {} 包裹的我称之为“魔法变量”,可以更灵活地进行重命名
>
> 更多说明请看[魔法匹配和魔法变量](https://github.com/Cp0204/quark-auto-save/wiki/魔法匹配和魔法变量)
> `$TV` 可适配和自动整理市面上90%分享剧集的文件名格式,具体实现见代码,欢迎贡献规则
更多正则使用说明:[正则处理教程](https://github.com/Cp0204/quark-auto-save/wiki/正则处理教程)
### 刷新媒体库
@ -163,56 +163,14 @@ docker run --rm -v /var/run/docker.sock:/var/run/docker.sock containrrr/watchtow
请参考 Wiki [使用技巧集锦](https://github.com/Cp0204/quark-auto-save/wiki/使用技巧集锦)
## 生态项目
以下展示 QAS 生态项目,包括官方项目和第三方项目。
### 官方项目
* [QAS一键推送助手](https://greasyfork.org/zh-CN/scripts/533201-qas一键推送助手)
油猴脚本,在夸克网盘分享页面添加推送到 QAS 的按钮
* [SmartStrm](https://github.com/Cp0204/SmartStrm)
STRM 文件生成工具,用于转存后处理,媒体免下载入库播放。
### 第三方开源项目
> [!TIP]
>
> 以下第三方开源项目均由社区开发并保持开源,与 QAS 作者无直接关联。在部署到生产环境前,请自行评估相关风险。
>
> 如果您有新的项目没有在此列出,可以通过 Issues 提交。
* [nonebot-plugin-quark-autosave](https://github.com/fllesser/nonebot-plugin-quark-autosave)
QAS Telegram 机器人,快速管理自动转存任务
* [Astrbot_plugin_quarksave](https://github.com/lm379/astrbot_plugin_quarksave)
AstrBot 插件,调用 quark_auto_save 实现自动转存资源到夸克网盘
* [Telegram 媒体资源管理机器人](https://github.com/2beetle/tgbot)
一个功能丰富的 Telegram 机器人专注于媒体资源管理、Emby 集成、自动下载和夸克网盘资源管理。
## 打赏
如果这个项目让你受益,你可以无偿赠与我1块钱让我知道开源有价值。谢谢
如果这个项目让你受益你可以打赏我1块钱让我知道开源有价值。谢谢
![WeChatPay](https://cdn.jsdelivr.net/gh/Cp0204/Cp0204@main/img/wechat_pay_qrcode.png)
## 声明
项目为个人兴趣开发,旨在通过程序自动化提高网盘使用效率
本程序为个人兴趣开发,开源仅供学习与交流使用。
程序没有任何破解行为只是对于夸克已有的API进行封装所有数据来自于夸克官方API本人不对网盘内容负责、不对夸克官方API未来可能的变动导致的影响负责请自行斟酌使用。
开源仅供学习与交流使用,未盈利也未授权商业使用,严禁用于非法用途。
## Sponsor
CDN acceleration and security protection for this project are sponsored by Tencent EdgeOne.
<a href="https://edgeone.ai/?from=github" target="_blank"><img title="Best Asian CDN, Edge, and Secure Solutions - Tencent EdgeOne" src="https://edgeone.ai/media/34fe3a45-492d-4ea4-ae5d-ea1087ca7b4b.png" width="300"></a>
程序没有任何破解行为只是对于夸克已有的API进行封装所有数据来自于夸克官方API本人不对网盘内容负责、不对夸克官方API未来可能的改动导致的后果负责。

View File

@ -1,7 +1,6 @@
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
from flask import (
json,
Flask,
url_for,
session,
@ -15,48 +14,22 @@ from flask import (
)
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from concurrent.futures import ThreadPoolExecutor, as_completed
from sdk.cloudsaver import CloudSaver
from sdk.pansou import PanSou
from datetime import timedelta
import subprocess
import requests
import hashlib
import logging
import traceback
import base64
import json
import sys
import os
import re
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, parent_dir)
from quark_auto_save import Quark, Config, MagicRename
print(
r"""
____ ___ _____
/ __ \ / | / ___/
/ / / / / /| | \__ \
/ /_/ / / ___ |___/ /
\___\_\/_/ |_/____/
-- Quark-Auto-Save --
"""
)
sys.stdout.flush()
from quark_auto_save import Quark
from quark_auto_save import Config
def get_app_ver():
"""获取应用版本"""
try:
with open("build.json", "r") as f:
build_info = json.loads(f.read())
BUILD_SHA = build_info["BUILD_SHA"]
BUILD_TAG = build_info["BUILD_TAG"]
except Exception as e:
BUILD_SHA = os.getenv("BUILD_SHA", "")
BUILD_TAG = os.getenv("BUILD_TAG", "")
BUILD_SHA = os.environ.get("BUILD_SHA", "")
BUILD_TAG = os.environ.get("BUILD_TAG", "")
if BUILD_TAG[:1] == "v":
return BUILD_TAG
elif BUILD_SHA:
@ -69,20 +42,14 @@ def get_app_ver():
PYTHON_PATH = "python3" if os.path.exists("/usr/bin/python3") else "python"
SCRIPT_PATH = os.environ.get("SCRIPT_PATH", "./quark_auto_save.py")
CONFIG_PATH = os.environ.get("CONFIG_PATH", "./config/quark_config.json")
PLUGIN_FLAGS = os.environ.get("PLUGIN_FLAGS", "")
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
HOST = os.environ.get("HOST", "0.0.0.0")
PORT = os.environ.get("PORT", 5005)
TASK_TIMEOUT = int(os.environ.get("TASK_TIMEOUT", 1800))
DEBUG = os.environ.get("DEBUG", False)
config_data = {}
task_plugins_config_default = {}
task_plugins_config = {}
app = Flask(__name__)
app.config["APP_VERSION"] = get_app_ver()
app.secret_key = "ca943f6db6dd34823d36ab08d8d6f65d"
app.config["SESSION_COOKIE_NAME"] = "QUARK_AUTO_SAVE_SESSION"
app.config["PERMANENT_SESSION_LIFETIME"] = timedelta(days=31)
app.json.ensure_ascii = False
app.json.sort_keys = False
app.jinja_env.variable_start_string = "[["
@ -97,8 +64,6 @@ logging.basicConfig(
# 过滤werkzeug日志输出
if not DEBUG:
logging.getLogger("werkzeug").setLevel(logging.ERROR)
logging.getLogger("apscheduler").setLevel(logging.ERROR)
sys.modules["flask.cli"].show_server_banner = lambda *x: None
def gen_md5(string):
@ -107,15 +72,24 @@ def gen_md5(string):
return md5.hexdigest()
def get_login_token():
username = config_data["webui"]["username"]
password = config_data["webui"]["password"]
return gen_md5(f"token{username}{password}+-*/")[8:24]
# 读取 JSON 文件内容
def read_json():
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
return data
# 将数据写入 JSON 文件
def write_json(data):
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, sort_keys=False, indent=2)
def is_login():
login_token = get_login_token()
if session.get("token") == login_token or request.args.get("token") == login_token:
data = read_json()
username = data["webui"]["username"]
password = data["webui"]["password"]
if session.get("login") == gen_md5(username + password):
return True
else:
return False
@ -135,29 +109,27 @@ def favicon():
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = config_data["webui"]["username"]
password = config_data["webui"]["password"]
data = read_json()
username = data["webui"]["username"]
password = data["webui"]["password"]
# 验证用户名和密码
if (username == request.form.get("username")) and (
password == request.form.get("password")
):
logging.info(f">>> 用户 {username} 登录成功")
session.permanent = True
session["token"] = get_login_token()
session["login"] = gen_md5(username + password)
return redirect(url_for("index"))
else:
logging.info(f">>> 用户 {username} 登录失败")
return render_template("login.html", message="登录失败")
if is_login():
return redirect(url_for("index"))
return render_template("login.html", error=None)
# 退出登录
@app.route("/logout")
def logout():
session.pop("token", None)
session.pop("login", None)
return redirect(url_for("login"))
@ -166,68 +138,54 @@ def logout():
def index():
if not is_login():
return redirect(url_for("login"))
return render_template(
"index.html", version=app.config["APP_VERSION"], plugin_flags=PLUGIN_FLAGS
)
return render_template("index.html", version=app.config["APP_VERSION"])
# 获取配置数据
@app.route("/data")
def get_data():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
data = Config.read_json(CONFIG_PATH)
return redirect(url_for("login"))
data = read_json()
del data["webui"]
data["api_token"] = get_login_token()
data["task_plugins_config_default"] = task_plugins_config_default
return jsonify({"success": True, "data": data})
data["task_plugins_config"] = task_plugins_config
return jsonify(data)
# 更新数据
@app.route("/update", methods=["POST"])
def update():
global config_data
if not is_login():
return jsonify({"success": False, "message": "未登录"})
dont_save_keys = ["task_plugins_config_default", "api_token"]
for key, value in request.json.items():
if key not in dont_save_keys:
config_data.update({key: value})
Config.write_json(CONFIG_PATH, config_data)
return "未登录"
data = request.json
data["webui"] = read_json()["webui"]
if "task_plugins_config" in data:
del data["task_plugins_config"]
write_json(data)
# 重新加载任务
if reload_tasks():
logging.info(f">>> 配置更新成功")
return jsonify({"success": True, "message": "配置更新成功"})
return "配置更新成功"
else:
logging.info(f">>> 配置更新失败")
return jsonify({"success": False, "message": "配置更新失败"})
return "配置更新失败"
# 处理运行脚本请求
@app.route("/run_script_now", methods=["POST"])
@app.route("/run_script_now", methods=["GET"])
def run_script_now():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
tasklist = request.json.get("tasklist", [])
command = [PYTHON_PATH, "-u", SCRIPT_PATH, CONFIG_PATH]
return "未登录"
task_index = request.args.get("task_index", "")
command = [PYTHON_PATH, "-u", SCRIPT_PATH, CONFIG_PATH, task_index]
logging.info(
f">>> 手动运行任务 [{tasklist[0].get('taskname') if len(tasklist)>0 else 'ALL'}] 开始执行..."
f">>> 手动运行任务{int(task_index)+1 if task_index.isdigit() else 'all'}"
)
def generate_output():
# 设置环境变量
process_env = os.environ.copy()
process_env["PYTHONIOENCODING"] = "utf-8"
if request.json.get("quark_test"):
process_env["QUARK_TEST"] = "true"
process_env["COOKIE"] = json.dumps(
request.json.get("cookie", []), ensure_ascii=False
)
process_env["PUSH_CONFIG"] = json.dumps(
request.json.get("push_config", {}), ensure_ascii=False
)
if tasklist:
process_env["TASKLIST"] = json.dumps(tasklist, ensure_ascii=False)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
@ -253,276 +211,52 @@ def run_script_now():
)
@app.route("/task_suggestions")
def get_task_suggestions():
@app.route("/get_share_detail")
def get_share_files():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
query = request.args.get("q", "").lower()
deep = request.args.get("d", "").lower()
net_data = config_data.get("source", {}).get("net", {})
cs_data = config_data.get("source", {}).get("cloudsaver", {})
ps_data = config_data.get("source", {}).get("pansou", {})
def net_search():
if str(net_data.get("enable", "true")).lower() != "false":
base_url = base64.b64decode("aHR0cHM6Ly9zLjkxNzc4OC54eXo=").decode()
url = f"{base_url}/task_suggestions?q={query}&d={deep}"
response = requests.get(url)
return response.json()
return []
def cs_search():
if (
cs_data.get("server")
and cs_data.get("username")
and cs_data.get("password")
):
cs = CloudSaver(cs_data.get("server"))
cs.set_auth(
cs_data.get("username", ""),
cs_data.get("password", ""),
cs_data.get("token", ""),
)
search = cs.auto_login_search(query)
if search.get("success"):
if search.get("new_token"):
cs_data["token"] = search.get("new_token")
Config.write_json(CONFIG_PATH, config_data)
search_results = cs.clean_search_results(search.get("data"))
return search_results
return []
def ps_search():
if ps_data.get("server"):
ps = PanSou(ps_data.get("server"))
return ps.search(query, deep == "1")
return []
try:
search_results = []
with ThreadPoolExecutor(max_workers=3) as executor:
features = []
features.append(executor.submit(net_search))
features.append(executor.submit(cs_search))
features.append(executor.submit(ps_search))
for future in as_completed(features):
result = future.result()
search_results.extend(result)
# 按时间排序并去重
results = []
link_array = []
search_results.sort(key=lambda x: x.get("datetime", ""), reverse=True)
for item in search_results:
url = item.get("shareurl", "")
if url != "" and url not in link_array:
link_array.append(url)
results.append(item)
return jsonify({"success": True, "data": results})
except Exception as e:
return jsonify({"success": True, "message": f"error: {str(e)}"})
return jsonify({"error": "未登录"})
shareurl = request.args.get("shareurl", "")
account = Quark("", 0)
pwd_id, passcode, pdir_fid = account.get_id_from_url(shareurl)
is_sharing, stoken = account.get_stoken(pwd_id, passcode)
if not is_sharing:
return jsonify({"error": stoken})
share_detail = account.get_detail(pwd_id, stoken, pdir_fid, 1)
return jsonify(share_detail)
@app.route("/get_share_detail", methods=["POST"])
def get_share_detail():
@app.route("/get_savepath")
def get_savepath():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
shareurl = request.json.get("shareurl", "")
stoken = request.json.get("stoken", "")
account = Quark()
pwd_id, passcode, pdir_fid, paths = account.extract_url(shareurl)
if not stoken:
get_stoken = account.get_stoken(pwd_id, passcode)
if get_stoken.get("status") == 200:
stoken = get_stoken["data"]["stoken"]
else:
return jsonify(
{"success": False, "data": {"error": get_stoken.get("message")}}
)
share_detail = account.get_detail(
pwd_id, stoken, pdir_fid, _fetch_share=1, fetch_share_full_path=1
)
if share_detail.get("code") != 0:
return jsonify(
{"success": False, "data": {"error": share_detail.get("message")}}
)
data = share_detail["data"]
data["paths"] = [
{"fid": i["fid"], "name": i["file_name"]}
for i in share_detail["data"].get("full_path", [])
] or paths
data["stoken"] = stoken
# 正则处理预览
def preview_regex(data):
task = request.json.get("task", {})
magic_regex = request.json.get("magic_regex", {})
mr = MagicRename(magic_regex)
mr.set_taskname(task.get("taskname", ""))
account = Quark(config_data["cookie"][0])
get_fids = account.get_fids([task.get("savepath", "")])
if get_fids:
dir_file_list = account.ls_dir(get_fids[0]["fid"])["data"]["list"]
dir_filename_list = [dir_file["file_name"] for dir_file in dir_file_list]
else:
dir_file_list = []
dir_filename_list = []
pattern, replace = mr.magic_regex_conv(
task.get("pattern", ""), task.get("replace", "")
)
for share_file in data["list"]:
search_pattern = (
task["update_subdir"]
if share_file["dir"] and task.get("update_subdir")
else pattern
)
if re.search(search_pattern, share_file["file_name"]):
# 文件名重命名,目录不重命名
file_name_re = (
share_file["file_name"]
if share_file["dir"]
else mr.sub(pattern, replace, share_file["file_name"])
)
if file_name_saved := mr.is_exists(
file_name_re,
dir_filename_list,
(task.get("ignore_extension") and not share_file["dir"]),
):
share_file["file_name_saved"] = file_name_saved
else:
share_file["file_name_re"] = file_name_re
# 文件列表排序
if re.search(r"\{I+\}", replace):
mr.set_dir_file_list(dir_file_list, replace)
mr.sort_file_list(data["list"])
if request.json.get("task"):
preview_regex(data)
return jsonify({"success": True, "data": data})
@app.route("/get_savepath_detail")
def get_savepath_detail():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
account = Quark(config_data["cookie"][0])
paths = []
return jsonify({"error": "未登录"})
data = read_json()
account = Quark(data["cookie"][0], 0)
if path := request.args.get("path"):
path = re.sub(r"/+", "/", path)
if path == "/":
fid = 0
elif get_fids := account.get_fids([path]):
fid = get_fids[0]["fid"]
else:
dir_names = path.split("/")
if dir_names[0] == "":
dir_names.pop(0)
path_fids = []
current_path = ""
for dir_name in dir_names:
current_path += "/" + dir_name
path_fids.append(current_path)
if get_fids := account.get_fids(path_fids):
fid = get_fids[-1]["fid"]
paths = [
{"fid": get_fid["fid"], "name": dir_name}
for get_fid, dir_name in zip(get_fids, dir_names)
]
else:
return jsonify({"success": False, "data": {"error": "获取fid失败"}})
return jsonify([])
else:
fid = request.args.get("fid", "0")
file_list = {
"list": account.ls_dir(fid)["data"]["list"],
"paths": paths,
}
return jsonify({"success": True, "data": file_list})
@app.route("/delete_file", methods=["POST"])
def delete_file():
if not is_login():
return jsonify({"success": False, "message": "未登录"})
account = Quark(config_data["cookie"][0])
if fid := request.json.get("fid"):
response = account.delete([fid])
else:
response = {"success": False, "message": "缺失必要字段: fid"}
return jsonify(response)
# 添加任务接口
@app.route("/api/add_task", methods=["POST"])
def add_task():
global config_data
# 验证token
if not is_login():
return jsonify({"success": False, "code": 1, "message": "未登录"}), 401
# 必选字段
request_data = request.json
required_fields = ["taskname", "shareurl", "savepath"]
for field in required_fields:
if field not in request_data or not request_data[field]:
return (
jsonify(
{"success": False, "code": 2, "message": f"缺少必要字段: {field}"}
),
400,
)
if not request_data.get("addition"):
request_data["addition"] = task_plugins_config_default
# 添加任务
config_data["tasklist"].append(request_data)
Config.write_json(CONFIG_PATH, config_data)
logging.info(f">>> 通过API添加任务: {request_data['taskname']}")
return jsonify(
{"success": True, "code": 0, "message": "任务添加成功", "data": request_data}
)
fid = request.args.get("fid", 0)
file_list = account.ls_dir(fid)
return jsonify(file_list)
# 定时任务执行的函数
def run_python(args):
logging.info(f">>> 定时运行任务")
try:
result = subprocess.run(
f"{PYTHON_PATH} {args}",
shell=True,
timeout=TASK_TIMEOUT,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
)
# 输出执行日志
if result.stdout:
for line in result.stdout.strip().split("\n"):
if line.strip():
logging.info(line)
if result.returncode == 0:
logging.info(f">>> 任务执行成功")
else:
logging.error(f">>> 任务执行失败,返回码: {result.returncode}")
if result.stderr:
logging.error(f"错误信息: {result.stderr[:500]}")
except subprocess.TimeoutExpired as e:
logging.error(f">>> 任务执行超时(>{TASK_TIMEOUT}s),强制终止")
except Exception as e:
logging.error(f">>> 任务执行异常: {str(e)}")
logging.error(traceback.format_exc())
finally:
# 确保函数能够正常返回
logging.debug(f">>> run_python 函数执行完成")
os.system(f"{PYTHON_PATH} {args}")
# 重新加载任务
def reload_tasks():
# 读取定时规则
if crontab := config_data.get("crontab"):
# 读取数据
data = read_json()
# 添加新任务
crontab = data.get("crontab")
if crontab:
if scheduler.state == 1:
scheduler.pause() # 暂停调度器
trigger = CronTrigger.from_crontab(crontab)
@ -532,10 +266,6 @@ def reload_tasks():
trigger=trigger,
args=[f"{SCRIPT_PATH} {CONFIG_PATH}"],
id=SCRIPT_PATH,
max_instances=1, # 最多允许1个实例运行
coalesce=True, # 合并错过的任务,避免堆积
misfire_grace_time=300, # 错过任务的宽限期(秒),超过则跳过
replace_existing=True, # 替换已存在的同ID任务
)
if scheduler.state == 0:
scheduler.start()
@ -553,49 +283,34 @@ def reload_tasks():
def init():
global config_data, task_plugins_config_default
logging.info(">>> 初始化配置")
global task_plugins_config
logging.info(f">>> 初始化配置")
# 检查配置文件是否存在
if not os.path.exists(CONFIG_PATH):
if not os.path.exists(os.path.dirname(CONFIG_PATH)):
os.makedirs(os.path.dirname(CONFIG_PATH))
with open("quark_config.json", "rb") as src, open(CONFIG_PATH, "wb") as dest:
dest.write(src.read())
# 读取配置
config_data = Config.read_json(CONFIG_PATH)
Config.breaking_change_update(config_data)
if not config_data.get("magic_regex"):
config_data["magic_regex"] = MagicRename().magic_regex
data = read_json()
Config.breaking_change_update(data)
# 默认管理账号
config_data["webui"] = {
data["webui"] = {
"username": os.environ.get("WEBUI_USERNAME")
or config_data.get("webui", {}).get("username", "admin"),
or data.get("webui", {}).get("username", "admin"),
"password": os.environ.get("WEBUI_PASSWORD")
or config_data.get("webui", {}).get("password", "admin123"),
or data.get("webui", {}).get("password", "admin123"),
}
# 默认定时规则
if not config_data.get("crontab"):
config_data["crontab"] = "0 8,18,20 * * *"
if not data.get("crontab"):
data["crontab"] = "0 8,18,20 * * *"
# 初始化插件配置
_, plugins_config_default, task_plugins_config_default = Config.load_plugins()
plugins_config_default.update(config_data.get("plugins", {}))
config_data["plugins"] = plugins_config_default
# 更新配置
Config.write_json(CONFIG_PATH, config_data)
_, plugins_config_default, task_plugins_config = Config.load_plugins()
plugins_config_default.update(data.get("plugins", {}))
data["plugins"] = plugins_config_default
write_json(data)
if __name__ == "__main__":
init()
reload_tasks()
logging.info(">>> 启动Web服务")
logging.info(f"运行在: http://{HOST}:{PORT}")
app.run(
debug=DEBUG,
host=HOST,
port=PORT,
)
app.run(debug=DEBUG, host="0.0.0.0", port=5005)

View File

@ -1,168 +0,0 @@
import re
import requests
from sdk.common import iso_to_cst
class CloudSaver:
"""
CloudSaver 用于获取云盘资源
"""
def __init__(self, server):
self.server = server
self.username = None
self.password = None
self.token = None
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
def set_auth(self, username, password, token=""):
self.username = username
self.password = password
self.token = token
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
def login(self):
if not self.username or not self.password:
return {"success": False, "message": "CloudSaver未设置用户名或密码"}
try:
url = f"{self.server}/api/user/login"
data = {"username": self.username, "password": self.password}
response = self.session.post(url, json=data)
result = response.json()
if result.get("success"):
self.token = result.get("data", {}).get("token")
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
return {"success": True, "token": self.token}
else:
return {
"success": False,
"message": f"CloudSaver登录{result.get('message', '未知错误')}",
}
except Exception as e:
return {"success": False, "message": str(e)}
def search(self, keyword, last_message_id=""):
"""
搜索资源
Args:
keyword (str): 搜索关键词
last_message_id (str): 上一条消息ID用于分页
Returns:
list: 搜索结果列表
"""
try:
url = f"{self.server}/api/search"
params = {"keyword": keyword, "lastMessageId": last_message_id}
response = self.session.get(url, params=params)
result = response.json()
if result.get("success"):
data = result.get("data", [])
return {"success": True, "data": data}
else:
return {"success": False, "message": result.get("message", "未知错误")}
except Exception as e:
return {"success": False, "message": str(e)}
def auto_login_search(self, keyword, last_message_id=""):
"""
自动登录并搜索资源
Args:
keyword (str): 搜索关键词
last_message_id (str): 上一条消息ID用于分页
"""
result = self.search(keyword, last_message_id)
if result.get("success"):
return result
else:
if (
result.get("message") == "无效的 token"
or result.get("message") == "未提供 token"
):
login_result = self.login()
if login_result.get("success"):
result = self.search(keyword, last_message_id)
result["new_token"] = login_result.get("token")
return result
else:
return {
"success": False,
"message": login_result.get("message", "未知错误"),
}
return {"success": False, "message": result.get("message", "未知错误")}
def clean_search_results(self, search_results):
"""
清洗搜索结果
Args:
search_results (list): 搜索结果列表
Returns:
list: 夸克网盘链接列表
"""
pattern_title = r"(名称|标题)[:]?(.*)"
pattern_content = r"(描述|简介)[:]?(.*)(链接|标签)"
clean_results = []
link_array = []
for channel in search_results:
for item in channel.get("list", []):
cloud_links = item.get("cloudLinks", [])
for link in cloud_links:
if link.get("cloudType") == "quark":
# 清洗标题
title = item.get("title", "")
if match := re.search(pattern_title, title, re.DOTALL):
title = match.group(2)
title = title.replace("&amp;", "&").strip()
# 清洗内容
content = item.get("content", "")
if match := re.search(pattern_content, content, re.DOTALL):
content = match.group(2)
content = content.replace('<mark class="highlight">', "")
content = content.replace("</mark>", "")
content = content.strip()
# 统一发布时间格式
pubdate = item.get("pubDate", "")
if pubdate:
pubdate = iso_to_cst(pubdate)
# 链接去重
if link.get("link") not in link_array:
link_array.append(link.get("link"))
clean_results.append(
{
"shareurl": link.get("link"),
"taskname": title,
"content": content,
"datetime": pubdate,
"tags": item.get("tags", []),
"channel": item.get("channelId", ""),
"source": "CloudSaver"
}
)
return clean_results
# 测试示例
if __name__ == "__main__":
# 创建CloudSaver实例
server = ""
username = ""
password = ""
token = ""
cloud_saver = CloudSaver(server)
cloud_saver.set_auth(username, password, token)
# 搜索资源
results = cloud_saver.auto_login_search("黑镜")
# 提取夸克网盘链接
clean_results = cloud_saver.clean_search_results(results.get("data", []))
# 打印结果
for item in clean_results:
print(f"标题: {item['taskname']}")
print(f"描述: {item['content']}")
print(f"链接: {item['shareurl']}")
print(f"标签: {' '.join(item['tags'])}")
print("-" * 50)

View File

@ -1,16 +0,0 @@
from datetime import datetime, timezone, timedelta
def iso_to_cst(iso_time_str: str) -> str:
"""将 ISO 格式的时间字符串转换为 CST(China Standard Time) 时间并格式化为 %Y-%m-%d %H:%M:%S 格式
Args:
iso_time_str (str): ISO 格式时间字符串
Returns:
str: CST(China Standard Time) 时间字符串
"""
dt = datetime.fromisoformat(iso_time_str)
tz = timezone(timedelta(hours=8))
dt_cst = dt if dt.astimezone(tz) > datetime.now(tz) else dt.astimezone(tz)
return dt_cst.strftime("%Y-%m-%d %H:%M:%S") if dt_cst.year >= 1970 else ""

View File

@ -1,97 +0,0 @@
import re
import requests
from sdk.common import iso_to_cst
class PanSou:
"""
PanSou 用于获取云盘资源
"""
def __init__(self, server):
self.server = server
self.session = requests.Session()
def search(self, keyword: str, refresh: bool = False) -> list:
"""搜索资源
Args:
keyword (str): 搜索关键字
Returns:
list: 资源列表
"""
try:
url = f"{self.server.rstrip('/')}/api/search"
params = {"kw": keyword, "cloud_types": ["quark"], "res": "merge", "refresh": refresh}
response = self.session.get(url, params=params)
result = response.json()
if result.get("code") == 0:
data = result.get("data", {}).get("merged_by_type", {}).get("quark", [])
return self.format_search_results(data)
return []
except Exception as _:
return []
def format_search_results(self, search_results: list) -> list:
"""格式化搜索结果
Args:
search_results (list): 搜索结果列表
Returns:
list: 夸克网盘资源列表
"""
pattern = (
r'^(.*?)'
r'(?:'
r'[【\[]?'
r'(?:简介|介绍|描述)'
r'[】\]]?'
r'[:]?'
r')'
r'(.*)$'
)
format_results = []
link_array = []
for item in search_results:
url = item.get("url", "")
note = item.get("note", "")
tm = item.get("datetime", "")
if tm:
tm = iso_to_cst(tm)
match = re.search(pattern, note)
if match:
title = match.group(1)
content = match.group(2)
else:
title = note
content = ""
if url != "" and url not in link_array:
link_array.append(url)
format_results.append({
"shareurl": url,
"taskname": title,
"content": content,
"datetime": tm,
"channel": item.get("source", ""),
"source": "PanSou"
})
return format_results
if __name__ == "__main__":
server: str = "https://so.252035.xyz"
pansou = PanSou(server)
results = pansou.search("哪吒")
for item in results:
print(f"标题: {item['taskname']}")
print(f"描述: {item['content']}")
print(f"链接: {item['shareurl']}")
print(f"时间: {item['datetime']}")
print("-" * 50)

2018
app/static/css/bootstrap-icons.css vendored Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@ -1,243 +0,0 @@
body {
font-size: 1rem;
padding-bottom: 110px;
}
@media (max-width: 768px) {
.container-fluid {
padding-right: 5px;
padding-left: 5px;
}
}
@media (min-width: 1360px) {
.container-fluid {
max-width: 1360px;
margin: 0 auto;
}
}
.bottom-buttons {
z-index: 99;
position: fixed;
left: 50%;
bottom: 20px;
background-color: transparent;
display: flex;
flex-direction: row;
transform: translateX(-50%);
}
.bottom-buttons button {
border-radius: 50%;
margin: 0 10px;
width: 50px;
height: 50px;
display: flex;
align-items: center;
justify-content: center;
border: none;
box-shadow: 0 2px 5px rgba(0, 0, 0, 0.2);
}
.title {
margin-top: 30px;
margin-bottom: 10px;
}
table.jsoneditor-tree>tbody>tr.jsoneditor-expandable:first-child {
display: none;
}
.modal-dialog {
max-width: 800px;
}
.modal-body {
max-height: calc(100vh - 200px);
overflow-y: auto;
}
.task-suggestions {
width: 100%;
max-height: 250px;
overflow-y: auto;
transform: translate(0, -100%);
top: 0;
margin-top: -5px;
border: 1px solid #007bff;
z-index: 1021;
}
/*
* Sidebar
*/
.sidebar {
position: fixed;
top: 0;
bottom: 0;
left: 0;
z-index: 100;
/* Behind the navbar */
padding: 54px 0 0;
/* Height of navbar */
box-shadow: inset -1px 0 0 rgba(0, 0, 0, 0.1);
}
.sidebar-sticky {
position: relative;
top: 0;
height: calc(100vh - 54px);
padding-top: 0.5rem;
overflow-x: hidden;
overflow-y: auto;
/* Scrollable contents if viewport is shorter than content. */
}
@supports ((position: -webkit-sticky) or (position: sticky)) {
.sidebar-sticky {
position: -webkit-sticky;
position: sticky;
}
}
.sidebar .nav-link {
font-size: medium;
color: #333;
padding: 10px;
transition: background-color 0.3s ease;
/* 添加过渡效果 */
}
.sidebar .nav-link:hover {
background-color: #e0f0ff;
/* 改变背景颜色 */
}
.sidebar .nav-link i {
margin-right: 10px;
margin-left: 10px;
}
.sidebar .nav-link.active {
background-color: #007bff;
color: white !important;
}
.sidebar-heading {
font-size: 0.75rem;
text-transform: uppercase;
}
/*
* Navbar
*/
.navbar-brand {
padding-top: 0.75rem;
padding-bottom: 0.75rem;
background-color: rgba(0, 0, 0, 0.25);
box-shadow: inset -1px 0 0 rgba(0, 0, 0, 0.25);
}
.navbar .navbar-toggler {
right: 1rem;
}
.navbar .form-control {
padding: 0.75rem 1rem;
border-width: 0;
border-radius: 0;
}
.form-control-dark {
color: #fff;
background-color: rgba(255, 255, 255, 0.1);
border-color: rgba(255, 255, 255, 0.1);
}
.form-control-dark:focus {
border-color: transparent;
box-shadow: 0 0 0 3px rgba(255, 255, 255, 0.25);
}
.cursor-pointer {
cursor: pointer;
}
.nav-bottom {
position: absolute;
bottom: 32px;
width: 100%;
font-size: small;
}
.position-relative:hover .position-absolute {
display: block !important;
}
.qrcode-tutorial {
display: none;
left: 50%;
transform: translateX(-50%);
bottom: 100%;
margin-bottom: 10px;
z-index: 1000;
border: 1px solid #ddd;
background-color: #fff;
padding: 10px;
border-radius: 5px;
max-width: 100%;
text-align: center;
}
.qrcode-tutorial img {
max-width: 100%;
height: auto;
}
/* Toast */
.toast-container {
position: fixed;
top: 80px;
right: 20px;
z-index: 9999;
width: 300px;
}
.toast {
background-color: rgba(255, 255, 255, 0.95);
border-radius: 8px;
border-width: 0 0 0 5px !important;
margin-bottom: 10px;
animation: slideIn 0.3s ease forwards;
}
.toast.success {
border-left-color: #28a745 !important;
}
.toast.error {
border-left-color: #dc3545 !important;
}
.toast.warning {
border-left-color: #ffc107 !important;
}
.toast.info {
border-left-color: #17a2b8 !important;
}
@keyframes slideIn {
from {
transform: translateX(100%);
opacity: 0;
}
to {
transform: translateX(0);
opacity: 1;
}
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.5 KiB

View File

@ -1,286 +0,0 @@
// ==UserScript==
// @name QAS一键推送助手
// @namespace https://github.com/Cp0204/quark-auto-save
// @license AGPL
// @version 0.6
// @description 在夸克网盘分享页面添加推送到 QAS 的按钮
// @icon https://pan.quark.cn/favicon.ico
// @author Cp0204
// @match https://pan.quark.cn/s/*
// @grant GM_getValue
// @grant GM_setValue
// @grant GM_xmlhttpRequest
// @require https://cdn.jsdelivr.net/npm/sweetalert2@11
// @downloadURL https://cdn.jsdelivr.net/gh/Cp0204/quark-auto-save@refs/heads/main/app/static/js/qas.addtask.user.js
// @updateURL https://cdn.jsdelivr.net/gh/Cp0204/quark-auto-save@refs/heads/main/app/static/js/qas.addtask.user.js
// ==/UserScript==
(function () {
'use strict';
let qas_base = GM_getValue('qas_base', '');
let qas_token = GM_getValue('qas_token', '');
let default_pattern = GM_getValue('default_pattern', '');
let default_replace = GM_getValue('default_replace', '');
// QAS 设置弹窗函数
function showQASSettingDialog(callback) {
Swal.fire({
title: 'QAS 设置',
showCancelButton: true,
html: `
<label for="qas_base">QAS 地址</label>
<input id="qas_base" class="swal2-input" placeholder="如: http://192.168.1.8:5005" value="${qas_base}"><br>
<label for="qas_token">QAS Token</label>
<input id="qas_token" class="swal2-input" placeholder="v0.5+ 系统配置中查找" value="${qas_token}"><br>
<label for="qas_token">默认正则</label>
<input id="default_pattern" class="swal2-input" placeholder="如 $TV" value="${default_pattern}"><br>
<label for="qas_token">默认替换</label><input id="default_replace" class="swal2-input" value="${default_replace}">
`,
focusConfirm: false,
preConfirm: () => {
qas_base = document.getElementById('qas_base').value;
qas_token = document.getElementById('qas_token').value;
default_pattern = document.getElementById('default_pattern').value;
default_replace = document.getElementById('default_replace').value;
if (!qas_base || !qas_token) {
Swal.showValidationMessage('请填写 QAS 地址和 Token');
}
return { qas_base: qas_base, qas_token: qas_token, default_pattern: default_pattern, default_replace: default_replace }
}
}).then((result) => {
if (result.isConfirmed) {
GM_setValue('qas_base', result.value.qas_base);
GM_setValue('qas_token', result.value.qas_token);
GM_setValue('default_pattern', result.value.default_pattern);
GM_setValue('default_replace', result.value.default_replace);
qas_base = result.value.qas_base;
qas_token = result.value.qas_token;
default_pattern = result.value.default_pattern;
default_replace = result.value.default_replace;
if (callback) {
callback(); // 执行回调函数
}
}
});
}
// 添加 QAS 设置按钮
function addQASSettingButton() {
function waitForElement(selector, callback) {
const element = document.querySelector(selector);
if (element) {
callback(element);
} else {
setTimeout(() => waitForElement(selector, callback), 500);
}
}
waitForElement('.pc-member-entrance', (PcMemberButton) => {
const qasSettingButton = document.createElement('div');
qasSettingButton.className = 'pc-member-entrance';
qasSettingButton.innerHTML = 'QAS设置';
qasSettingButton.addEventListener('click', () => {
showQASSettingDialog();
});
PcMemberButton.parentNode.insertBefore(qasSettingButton, PcMemberButton.nextSibling);
});
}
// 推送到 QAS 按钮
function addQASButton() {
function waitForElement(selector, callback) {
const element = document.querySelector(selector);
if (element) {
callback(element);
} else {
setTimeout(() => waitForElement(selector, callback), 500);
}
}
waitForElement('.ant-btn.share-save', (saveButton) => {
const qasButton = document.createElement('button');
qasButton.type = 'button';
qasButton.className = 'ant-btn share-save';
qasButton.style.marginLeft = '10px';
qasButton.innerHTML = '<span class="share-save-ico"></span><span>创建QAS任务</span>';
let taskname, shareurl, savepath; // 声明变量
// 获取数据函数
function getData() {
const currentUrl = window.location.href;
const lastTitle = document.querySelector('.primary .bcrumb-filename:last-child')?.getAttribute('title') || null;
taskname = (lastTitle && lastTitle != "全部文件") ? lastTitle : document.querySelector('.author-name').textContent;
shareurl = currentUrl;
let pathElement = document.querySelector('.path-name');
savepath = pathElement ? pathElement.title.replace('全部文件', '').trim() : "";
savepath += "/" + taskname;
qasButton.title = `任务名称: ${taskname}\n分享链接: ${shareurl}\n保存路径: ${savepath}`;
}
// 添加鼠标悬停事件
qasButton.addEventListener('mouseover', () => {
getData(); // 鼠标悬停时获取数据
});
// 添加点击事件
qasButton.addEventListener('click', () => {
getData(); // 点击时重新获取数据,确保最新
// 检查 qas_base 是否包含 http 或 https如果没有则添加 http://
let qasApiBase = qas_base;
if (!qasApiBase.startsWith('http')) {
qasApiBase = 'http://' + qasApiBase;
}
const apiUrl = `${qasApiBase}/api/add_task?token=${qas_token}`;
const data = {
"taskname": taskname,
"shareurl": shareurl,
"savepath": savepath,
"pattern": default_pattern,
"replace": default_replace,
};
GM_xmlhttpRequest({
method: 'POST',
url: apiUrl,
headers: {
'Content-Type': 'application/json'
},
data: JSON.stringify(data),
onload: function (response) {
// 检查 HTTP 状态码
if (response.status === 401) {
Swal.fire({
title: '认证失败',
text: 'Token 无效或已过期,请重新配置 QAS Token',
icon: 'error',
confirmButtonText: '重新配置',
showCancelButton: true,
cancelButtonText: '取消'
}).then((result) => {
if (result.isConfirmed) {
showQASSettingDialog();
}
});
return;
}
if (response.status === 503) {
Swal.fire({
title: '服务器不可用',
html: `服务器暂时无法处理请求 (503)<br><br>
<small>可能原因<br>
QAS 服务未运行<br>
服务器过载<br>
网络连接问题</small>`,
icon: 'error',
confirmButtonText: '重新配置',
showCancelButton: true,
cancelButtonText: '取消'
}).then((result) => {
if (result.isConfirmed) {
showQASSettingDialog();
}
});
return;
}
// 检查响应内容类型
const contentType = response.responseHeaders.match(/content-type:\s*([^;\s]+)/i);
if (contentType && !contentType[1].includes('application/json')) {
Swal.fire({
title: '认证失败',
html: `服务器返回了非 JSON 响应,可能是 Token 错误<br><br>
<small>响应类型: ${contentType[1]}</small><br>
<small>响应状态: ${response.status}</small>`,
icon: 'error',
confirmButtonText: '重新配置',
showCancelButton: true,
cancelButtonText: '取消'
}).then((result) => {
if (result.isConfirmed) {
showQASSettingDialog();
}
});
return;
}
try {
const jsonResponse = JSON.parse(response.responseText);
if (jsonResponse.success) {
Swal.fire({
title: '任务创建成功',
html: `<small>
<b>任务名称:</b> ${taskname}<br><br>
<b>保存路径:</b> ${savepath}<br><br>
<a href="${qasApiBase}" target="_blank"> QAS 查看</a>
<small>`,
icon: 'success'
});
} else {
Swal.fire({
title: '任务创建失败',
text: jsonResponse.message,
icon: 'error'
});
}
} catch (e) {
Swal.fire({
title: '解析响应失败',
html: `<small>
响应状态: ${response.status}<br>
响应内容: ${response.responseText.substring(0, 200)}...<br><br>
错误详情: ${e.message}
</small>`,
icon: 'error',
confirmButtonText: '重新配置',
showCancelButton: true,
cancelButtonText: '取消'
}).then((result) => {
if (result.isConfirmed) {
showQASSettingDialog();
}
});
}
},
onerror: function (error) {
Swal.fire({
title: '网络请求失败',
text: '无法连接到 QAS 服务器,请检查网络连接和服务器地址',
icon: 'error',
confirmButtonText: '重新配置',
showCancelButton: true,
cancelButtonText: '取消'
}).then((result) => {
if (result.isConfirmed) {
showQASSettingDialog();
}
});
}
});
});
saveButton.parentNode.insertBefore(qasButton, saveButton.nextSibling);
});
}
// 初始化
(function init() {
addQASSettingButton();
if (!qas_base || !qas_token) {
showQASSettingDialog(() => {
addQASButton(); // 在设置后添加 QAS 按钮
});
} else {
addQASButton(); // 如果配置存在,则直接添加 QAS 按钮
}
})(); // 立即执行初始化
})();

File diff suppressed because it is too large Load Diff

View File

@ -1,84 +1,33 @@
<!DOCTYPE html>
<html lang="zh-CN">
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>登录</title>
<!-- 引入 Bootstrap CSS -->
<link rel="stylesheet" href="./static/css/bootstrap.min.css">
<link rel="stylesheet" href="./static/css/bootstrap-icons.min.css">
<style>
body {
background: linear-gradient(135deg, #c4d7ff 0%, #7996ff 100%);
min-height: 100vh;
display: flex;
align-items: center;
}
.login-card {
max-width: 400px;
width: 100%;
margin: 0 auto;
box-shadow: 0 0 20px rgba(0, 0, 0, 0.1);
border-radius: 10px;
overflow: hidden;
}
.login-header {
background: rgba(255, 255, 255, 0.9);
padding: 1rem 2rem;
text-align: center;
}
.login-body {
background: #fff;
padding: 2rem;
}
.btn {
border-radius: 20px;
padding: 10px 20px;
width: 100%;
}
</style>
</head>
<body>
<div class="container">
<div class="login-card">
<div class="login-header">
<h1 class="mb-3">登录</h1>
<p class="text-muted">欢迎回来,请登录您的账户</p>
</div>
<div class="login-body">
{% if message %}
<div class="alert alert-danger text-center" role="alert">
[[ message ]]
</div>
{% endif %}
<form action="/login" method="POST">
<div class="form-group mb-3">
<label for="username" class="form-label">用户名</label>
<div class="input-group">
<div class="input-group-prepend">
<span class="input-group-text"><i class="bi bi-person-fill"></i></span>
</div>
<input type="text" class="form-control" id="username" name="username" required>
</div>
</div>
<div class="form-group mb-4">
<label for="password" class="form-label">密码</label>
<div class="input-group">
<div class="input-group-prepend">
<span class="input-group-text"><i class="bi bi-lock-fill"></i></span>
</div>
<input type="password" class="form-control" id="password" name="password" required>
</div>
</div>
<button type="submit" class="btn btn-primary">登录</button>
</form>
</div>
<div class="container mt-5">
<h1 class="mb-4">登录</h1>
{% if message %}
<div class="alert alert-danger" role="alert">
[[ message ]]
</div>
{% endif %}
<form action="/login" method="POST">
<div class="form-group">
<label for="username">用户名</label>
<input type="text" class="form-control" id="username" name="username" required>
</div>
<div class="form-group">
<label for="password">密码</label>
<input type="password" class="form-control" id="password" name="password" required>
</div>
<button type="submit" class="btn btn-primary">登录</button>
</form>
</div>
</body>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 241 KiB

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 354 KiB

After

Width:  |  Height:  |  Size: 266 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 313 KiB

After

Width:  |  Height:  |  Size: 216 KiB

209
notify.py
View File

@ -72,13 +72,8 @@ push_config = {
'CHAT_URL': '', # synology chat url
'CHAT_TOKEN': '', # synology chat token
'PUSH_PLUS_TOKEN': '', # pushplus 推送的用户令牌
'PUSH_PLUS_USER': '', # pushplus 推送的群组编码
'PUSH_PLUS_TEMPLATE': 'html', # pushplus 发送模板支持html,txt,json,markdown,cloudMonitor,jenkins,route,pay
'PUSH_PLUS_CHANNEL': 'wechat', # pushplus 发送渠道支持wechat,webhook,cp,mail,sms
'PUSH_PLUS_WEBHOOK': '', # pushplus webhook编码可在pushplus公众号上扩展配置出更多渠道
'PUSH_PLUS_CALLBACKURL': '', # pushplus 发送结果回调地址,会把推送最终结果通知到这个地址上
'PUSH_PLUS_TO': '', # pushplus 好友令牌微信公众号渠道填写好友令牌企业微信渠道填写企业微信用户id
'PUSH_PLUS_TOKEN': '', # push+ 微信推送的用户令牌
'PUSH_PLUS_USER': '', # push+ 微信推送的群组编码
'WE_PLUS_BOT_TOKEN': '', # 微加机器人的用户令牌
'WE_PLUS_BOT_RECEIVER': '', # 微加机器人的消息接收者
@ -128,19 +123,6 @@ push_config = {
'NTFY_URL': '', # ntfy地址,如https://ntfy.sh
'NTFY_TOPIC': '', # ntfy的消息应用topic
'NTFY_PRIORITY':'3', # 推送消息优先级,默认为3
'NTFY_TOKEN': '', # 推送token,可选
'NTFY_USERNAME': '', # 推送用户名称,可选
'NTFY_PASSWORD': '', # 推送用户密码,可选
'NTFY_ACTIONS': '', # 推送用户动作,可选
'WXPUSHER_APP_TOKEN': '', # wxpusher 的 appToken 官方文档: https://wxpusher.zjiecode.com/docs/ 管理后台: https://wxpusher.zjiecode.com/admin/
'WXPUSHER_TOPIC_IDS': '', # wxpusher 的 主题ID多个用英文分号;分隔 topic_ids 与 uids 至少配置一个才行
'WXPUSHER_UIDS': '', # wxpusher 的 用户ID多个用英文分号;分隔 topic_ids 与 uids 至少配置一个才行
'DODO_BOTTOKEN': '', # DoDo机器人的token DoDo开发平台https://doker.imdodo.com/
'DODO_BOTID': '', # DoDo机器人的id
'DODO_LANDSOURCEID': '', # DoDo机器人所在的群ID
'DODO_SOURCEID': '', # DoDo机器人推送目标用户的ID
}
# fmt: on
@ -155,6 +137,7 @@ def bark(title: str, content: str) -> None:
使用 bark 推送消息
"""
if not push_config.get("BARK_PUSH"):
print("bark 服务的 BARK_PUSH 未设置!!\n取消推送")
return
print("bark 服务启动")
@ -207,6 +190,7 @@ def dingding_bot(title: str, content: str) -> None:
使用 钉钉机器人 推送消息
"""
if not push_config.get("DD_BOT_SECRET") or not push_config.get("DD_BOT_TOKEN"):
print("钉钉机器人 服务的 DD_BOT_SECRET 或者 DD_BOT_TOKEN 未设置!!\n取消推送")
return
print("钉钉机器人 服务启动")
@ -236,6 +220,7 @@ def feishu_bot(title: str, content: str) -> None:
使用 飞书机器人 推送消息
"""
if not push_config.get("FSKEY"):
print("飞书 服务的 FSKEY 未设置!!\n取消推送")
return
print("飞书 服务启动")
@ -254,6 +239,7 @@ def go_cqhttp(title: str, content: str) -> None:
使用 go_cqhttp 推送消息
"""
if not push_config.get("GOBOT_URL") or not push_config.get("GOBOT_QQ"):
print("go-cqhttp 服务的 GOBOT_URL 或 GOBOT_QQ 未设置!!\n取消推送")
return
print("go-cqhttp 服务启动")
@ -271,6 +257,7 @@ def gotify(title: str, content: str) -> None:
使用 gotify 推送消息
"""
if not push_config.get("GOTIFY_URL") or not push_config.get("GOTIFY_TOKEN"):
print("gotify 服务的 GOTIFY_URL 或 GOTIFY_TOKEN 未设置!!\n取消推送")
return
print("gotify 服务启动")
@ -293,6 +280,7 @@ def iGot(title: str, content: str) -> None:
使用 iGot 推送消息
"""
if not push_config.get("IGOT_PUSH_KEY"):
print("iGot 服务的 IGOT_PUSH_KEY 未设置!!\n取消推送")
return
print("iGot 服务启动")
@ -312,12 +300,13 @@ def serverJ(title: str, content: str) -> None:
通过 serverJ 推送消息
"""
if not push_config.get("PUSH_KEY"):
print("serverJ 服务的 PUSH_KEY 未设置!!\n取消推送")
return
print("serverJ 服务启动")
data = {"text": title, "desp": content.replace("\n", "\n\n")}
match = re.match(r"sctp(\d+)t", push_config.get("PUSH_KEY"))
match = re.match(r'sctp(\d+)t', push_config.get("PUSH_KEY"))
if match:
num = match.group(1)
url = f'https://{num}.push.ft07.com/send/{push_config.get("PUSH_KEY")}.send'
@ -337,6 +326,7 @@ def pushdeer(title: str, content: str) -> None:
通过PushDeer 推送消息
"""
if not push_config.get("DEER_KEY"):
print("PushDeer 服务的 DEER_KEY 未设置!!\n取消推送")
return
print("PushDeer 服务启动")
data = {
@ -362,6 +352,7 @@ def chat(title: str, content: str) -> None:
通过Chat 推送消息
"""
if not push_config.get("CHAT_URL") or not push_config.get("CHAT_TOKEN"):
print("chat 服务的 CHAT_URL或CHAT_TOKEN 未设置!!\n取消推送")
return
print("chat 服务启动")
data = "payload=" + json.dumps({"text": title + "\n" + content})
@ -376,36 +367,26 @@ def chat(title: str, content: str) -> None:
def pushplus_bot(title: str, content: str) -> None:
"""
通过 pushplus 推送消息
通过 push+ 推送消息
"""
if not push_config.get("PUSH_PLUS_TOKEN"):
print("PUSHPLUS 服务的 PUSH_PLUS_TOKEN 未设置!!\n取消推送")
return
print("PUSHPLUS 服务启动")
url = "https://www.pushplus.plus/send"
url = "http://www.pushplus.plus/send"
data = {
"token": push_config.get("PUSH_PLUS_TOKEN"),
"title": title,
"content": content,
"topic": push_config.get("PUSH_PLUS_USER"),
"template": push_config.get("PUSH_PLUS_TEMPLATE"),
"channel": push_config.get("PUSH_PLUS_CHANNEL"),
"webhook": push_config.get("PUSH_PLUS_WEBHOOK"),
"callbackUrl": push_config.get("PUSH_PLUS_CALLBACKURL"),
"to": push_config.get("PUSH_PLUS_TO"),
}
body = json.dumps(data).encode(encoding="utf-8")
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, data=body, headers=headers).json()
code = response["code"]
if code == 200:
print("PUSHPLUS 推送请求成功,可根据流水号查询推送结果:" + response["data"])
print(
"注意请求成功并不代表推送成功如未收到消息请到pushplus官网使用流水号查询推送最终结果"
)
elif code == 900 or code == 903 or code == 905 or code == 999:
print(response["msg"])
if response["code"] == 200:
print("PUSHPLUS 推送成功!")
else:
url_old = "http://pushplus.hxtrip.com/send"
@ -424,6 +405,7 @@ def weplus_bot(title: str, content: str) -> None:
通过 微加机器人 推送消息
"""
if not push_config.get("WE_PLUS_BOT_TOKEN"):
print("微加机器人 服务的 WE_PLUS_BOT_TOKEN 未设置!!\n取消推送")
return
print("微加机器人 服务启动")
@ -455,6 +437,7 @@ def qmsg_bot(title: str, content: str) -> None:
使用 qmsg 推送消息
"""
if not push_config.get("QMSG_KEY") or not push_config.get("QMSG_TYPE"):
print("qmsg 的 QMSG_KEY 或者 QMSG_TYPE 未设置!!\n取消推送")
return
print("qmsg 服务启动")
@ -473,10 +456,11 @@ def wecom_app(title: str, content: str) -> None:
通过 企业微信 APP 推送消息
"""
if not push_config.get("QYWX_AM"):
print("QYWX_AM 未设置!!\n取消推送")
return
QYWX_AM_AY = re.split(",", push_config.get("QYWX_AM"))
if 4 < len(QYWX_AM_AY) > 5:
print("QYWX_AM 设置错误!!")
print("QYWX_AM 设置错误!!\n取消推送")
return
print("企业微信 APP 服务启动")
@ -569,6 +553,7 @@ def wecom_bot(title: str, content: str) -> None:
通过 企业微信机器人 推送消息
"""
if not push_config.get("QYWX_KEY"):
print("企业微信机器人 服务的 QYWX_KEY 未设置!!\n取消推送")
return
print("企业微信机器人服务启动")
@ -594,6 +579,7 @@ def telegram_bot(title: str, content: str) -> None:
使用 telegram 机器人 推送消息
"""
if not push_config.get("TG_BOT_TOKEN") or not push_config.get("TG_USER_ID"):
print("tg 服务的 bot_token 或者 user_id 未设置!!\n取消推送")
return
print("tg 服务启动")
@ -642,6 +628,9 @@ def aibotk(title: str, content: str) -> None:
or not push_config.get("AIBOTK_TYPE")
or not push_config.get("AIBOTK_NAME")
):
print(
"智能微秘书 的 AIBOTK_KEY 或者 AIBOTK_TYPE 或者 AIBOTK_NAME 未设置!!\n取消推送"
)
return
print("智能微秘书 服务启动")
@ -680,6 +669,9 @@ def smtp(title: str, content: str) -> None:
or not push_config.get("SMTP_PASSWORD")
or not push_config.get("SMTP_NAME")
):
print(
"SMTP 邮件 的 SMTP_SERVER 或者 SMTP_SSL 或者 SMTP_EMAIL 或者 SMTP_PASSWORD 或者 SMTP_NAME 未设置!!\n取消推送"
)
return
print("SMTP 邮件 服务启动")
@ -734,6 +726,7 @@ def pushme(title: str, content: str) -> None:
使用 PushMe 推送消息
"""
if not push_config.get("PUSHME_KEY"):
print("PushMe 服务的 PUSHME_KEY 未设置!!\n取消推送")
return
print("PushMe 服务启动")
@ -766,6 +759,7 @@ def chronocat(title: str, content: str) -> None:
or not push_config.get("CHRONOCAT_QQ")
or not push_config.get("CHRONOCAT_TOKEN")
):
print("CHRONOCAT 服务的 CHRONOCAT_URL 或 CHRONOCAT_QQ 未设置!!\n取消推送")
return
print("CHRONOCAT 服务启动")
@ -809,17 +803,17 @@ def ntfy(title: str, content: str) -> None:
"""
通过 Ntfy 推送消息
"""
def encode_rfc2047(text: str) -> str:
"""将文本编码为符合 RFC 2047 标准的格式"""
encoded_bytes = base64.b64encode(text.encode("utf-8"))
encoded_str = encoded_bytes.decode("utf-8")
return f"=?utf-8?B?{encoded_str}?="
encoded_bytes = base64.b64encode(text.encode('utf-8'))
encoded_str = encoded_bytes.decode('utf-8')
return f'=?utf-8?B?{encoded_str}?='
if not push_config.get("NTFY_TOPIC"):
print("ntfy 服务的 NTFY_TOPIC 未设置!!\n取消推送")
return
print("ntfy 服务启动")
priority = "3"
priority = '3'
if not push_config.get("NTFY_PRIORITY"):
print("ntfy 服务的NTFY_PRIORITY 未设置!!默认设置为3")
else:
@ -828,15 +822,11 @@ def ntfy(title: str, content: str) -> None:
# 使用 RFC 2047 编码 title
encoded_title = encode_rfc2047(title)
data = content.encode(encoding="utf-8")
headers = {"Title": encoded_title, "Priority": priority, "Icon": "https://qn.whyour.cn/logo.png"} # 使用编码后的 title
if push_config.get("NTFY_TOKEN"):
headers['Authorization'] = "Bearer " + push_config.get("NTFY_TOKEN")
elif push_config.get("NTFY_USERNAME") and push_config.get("NTFY_PASSWORD"):
authStr = push_config.get("NTFY_USERNAME") + ":" + push_config.get("NTFY_PASSWORD")
headers['Authorization'] = "Basic " + base64.b64encode(authStr.encode('utf-8')).decode('utf-8')
if push_config.get("NTFY_ACTIONS"):
headers['Actions'] = encode_rfc2047(push_config.get("NTFY_ACTIONS"))
data = content.encode(encoding='utf-8')
headers = {
"Title": encoded_title, # 使用编码后的 title
"Priority": priority
}
url = push_config.get("NTFY_URL") + "/" + push_config.get("NTFY_TOPIC")
response = requests.post(url, data=data, headers=headers)
@ -845,111 +835,6 @@ def ntfy(title: str, content: str) -> None:
else:
print("Ntfy 推送失败!错误信息:", response.text)
def dodo_bot(title: str, content: str) -> None:
"""
通过 DoDo机器人 推送消息
"""
required_keys = [
'DODO_BOTTOKEN',
'DODO_BOTID',
'DODO_LANDSOURCEID',
'DODO_SOURCEID'
]
if not all(push_config.get(key) for key in required_keys):
missing = [key for key in required_keys if not push_config.get(key)]
print(f"DoDo 服务配置不完整,缺少以下参数: {', '.join(missing)}\n取消推送")
return
print("DoDo 服务启动")
url="https://botopen.imdodo.com/api/v2/personal/message/send"
botID=push_config.get('DODO_BOTID')
botToken=push_config.get('DODO_BOTTOKEN')
islandSourceId=push_config.get('DODO_LANDSOURCEID')
dodoSourceId=push_config.get('DODO_SOURCEID')
headers = {
'Authorization': f'Bot {botID}.{botToken}',
'Content-Type': 'application/json',
'Host': 'botopen.imdodo.com'
}
payload = json.dumps({
"islandSourceId": islandSourceId,
"dodoSourceId": dodoSourceId,
"messageType": 1,
"messageBody": {
"content": f"{title}\n\n{content}"
}
})
try:
response = requests.post(url, headers=headers, data=payload)
if response.status_code == 200:
response = response.json()
if response.get("status") == 0 and response.get("message") == "success":
print(f'DoDo 推送成功!')
else:
print(f'DoDo 推送失败!错误信息:\n{response}')
else:
print("DoDo 推送失败!错误信息:", response.text)
except Exception as e:
print(f"DoDo 推送请求异常: {str(e)}")
def wxpusher_bot(title: str, content: str) -> None:
"""
通过 wxpusher 推送消息
支持的环境变量:
- WXPUSHER_APP_TOKEN: appToken
- WXPUSHER_TOPIC_IDS: 主题ID, 多个用英文分号;分隔
- WXPUSHER_UIDS: 用户ID, 多个用英文分号;分隔
"""
if not push_config.get("WXPUSHER_APP_TOKEN"):
return
url = "https://wxpusher.zjiecode.com/api/send/message"
# 处理topic_ids和uids将分号分隔的字符串转为数组
topic_ids = []
if push_config.get("WXPUSHER_TOPIC_IDS"):
topic_ids = [
int(id.strip())
for id in push_config.get("WXPUSHER_TOPIC_IDS").split(";")
if id.strip()
]
uids = []
if push_config.get("WXPUSHER_UIDS"):
uids = [
uid.strip()
for uid in push_config.get("WXPUSHER_UIDS").split(";")
if uid.strip()
]
# topic_ids uids 至少有一个
if not topic_ids and not uids:
print("wxpusher 服务的 WXPUSHER_TOPIC_IDS 和 WXPUSHER_UIDS 至少设置一个!!")
return
print("wxpusher 服务启动")
data = {
"appToken": push_config.get("WXPUSHER_APP_TOKEN"),
"content": f"<h1>{title}</h1><br/><div style='white-space: pre-wrap;'>{content}</div>",
"summary": title,
"contentType": 2,
"topicIds": topic_ids,
"uids": uids,
"verifyPayType": 0,
}
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, json=data, headers=headers).json()
if response.get("code") == 1000:
print("wxpusher 推送成功!")
else:
print(f"wxpusher 推送失败!错误信息:{response.get('msg')}")
def parse_headers(headers):
if not headers:
return {}
@ -1006,6 +891,7 @@ def custom_notify(title: str, content: str) -> None:
通过 自定义通知 推送消息
"""
if not push_config.get("WEBHOOK_URL") or not push_config.get("WEBHOOK_METHOD"):
print("自定义通知的 WEBHOOK_URL 或 WEBHOOK_METHOD 未设置!!\n取消推送")
return
print("自定义通知服务启动")
@ -1107,21 +993,10 @@ def add_notify_function():
and push_config.get("CHRONOCAT_TOKEN")
):
notify_function.append(chronocat)
if (
push_config.get("DODO_BOTTOKEN")
and push_config.get("DODO_BOTID")
and push_config.get("DODO_LANDSOURCEID")
and push_config.get("DODO_SOURCEID")
):
notify_function.append(dodo_bot)
if push_config.get("WEBHOOK_URL") and push_config.get("WEBHOOK_METHOD"):
notify_function.append(custom_notify)
if push_config.get("NTFY_TOPIC"):
notify_function.append(ntfy)
if push_config.get("WXPUSHER_APP_TOKEN") and (
push_config.get("WXPUSHER_TOPIC_IDS") or push_config.get("WXPUSHER_UIDS")
):
notify_function.append(wxpusher_bot)
if not notify_function:
print(f"无推送渠道,请检查通知变量是否正确")
return notify_function

View File

@ -75,7 +75,7 @@ docker run -d \
```json
{
"plugins": {
"media_servers": {
"emby": {
"url": "http://your-emby-server:8096",
"token": "YOUR_EMBY_TOKEN"
@ -91,5 +91,4 @@ docker run -d \
| 插件 | 说明 | 贡献者 |
| ------- | -------------------- | --------------------------------------- |
| plex.py | 自动刷新 Plex 媒体库 | [zhazhayu](https://github.com/zhazhayu) |
| alist_strm_gen.py | 自动生成strm | [xiaoQQya](https://github.com/xiaoQQya) |
| alist_sync.py | 调用 alist 实现跨网盘转存 | [jenfonro](https://github.com/jenfonro) |
| alist_strm_gen.py | 自动生成strm | [xiaoQQya](https://github.com/xiaoQQya) |

View File

@ -1,12 +1,8 @@
[
"smartstrm",
"fnv_refresh_v2",
"alist",
"alist_strm",
"alist_strm_gen",
"alist_sync",
"aria2",
"emby",
"plex",
"fnv"
"plex"
]

View File

@ -50,7 +50,7 @@ class Alist_strm:
print(f"alist-strm配置运行: {config_name}")
return True
else:
print(f"alist-strm配置运行: 匹配失败❌请检查网络连通和cookie有效性")
print(f"alist-strm配置运行: 匹配失败❌")
except Exception as e:
print(f"获取alist-strm配置信息出错: {e}")
return False

View File

@ -137,7 +137,7 @@ class Alist_strm_gen:
if item.get("is_dir"):
self.check_dir(item_path)
else:
self.generate_strm(item_path, item)
self.generate_strm(item_path)
def get_file_list(self, path, force_refresh=False):
url = f"{self.url}/api/fs/list"
@ -157,7 +157,7 @@ class Alist_strm_gen:
print(f"📺 Alist-Strm生成: 获取文件列表出错❌ {e}")
return {}
def generate_strm(self, file_path, file_info):
def generate_strm(self, file_path):
ext = file_path.split(".")[-1]
if ext.lower() in self.video_exts:
strm_path = (
@ -169,11 +169,8 @@ class Alist_strm_gen:
return
if not os.path.exists(os.path.dirname(strm_path)):
os.makedirs(os.path.dirname(strm_path))
sign_param = (
"" if not file_info.get("sign") else f"?sign={file_info['sign']}"
)
with open(strm_path, "w", encoding="utf-8") as strm_file:
strm_file.write(f"{self.strm_server}{file_path}{sign_param}")
strm_file.write(f"{self.strm_server}{file_path}")
print(f"📺 生成STRM文件 {strm_path} 成功✅")
def get_root_folder_full_path(self, cookie, pdir_fid):

View File

@ -1,313 +0,0 @@
# plugins: 调用 alist 实现跨网盘转存
# author: https://github.com/jenfonro
import re
import json
import requests
class Alist_sync:
default_config = {
"url": "", # Alist服务器URL
"token": "", # Alist服务器Token
"quark_storage_id": "", # Alist 服务器夸克存储 ID
"save_storage_id": "", # Alist 服务器同步的存储 ID
"tv_mode": "", # TV库模式填入非0值开启
# TV库模式说明
# 1.开启后会验证文件名是否包含S01E01的正则格式目前仅包含mp4及mkv
# 2.会对比保存目录下是否存在该名称的mp4、mkv文件如果不存在才会进行同步
# 3.夸克目录及同步目录均会提取为S01E01的正则进行匹配不受其它字符影响
}
is_active = False
# 缓存参数
default_task_config = {
"enable": False, # 当前任务开关,
"save_path": "", # 需要同步目录,默认空时路径则会与夸克的保存路径一致,不开启完整路径模式时,默认根目录为保存驱动的根目录
"verify_path": "", # 验证目录主要用于影视库避免重复文件一般配合alist的别名功能及full_path_mode使用用于多个网盘的源合并成一个目录
"full_path_mode": False, # 完整路径模式
# 完整路径模式开启后不再限制保存目录的存储驱动将根据填入的路径进行保存需要填写完整的alist目录
}
def __init__(self, **kwargs):
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.url and self.token:
if self.verify_server():
self.is_active = True
def _send_request(self, method, url, **kwargs):
headers = {"Authorization": self.token, "Content-Type": "application/json"}
if "headers" in kwargs:
headers = kwargs["headers"]
del kwargs["headers"]
try:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
return response
except Exception as e:
print(f"_send_request error:\n{e}")
fake_response = requests.Response()
fake_response.status_code = 500
fake_response._content = b'{"status": 500, "message": "request error"}'
return fake_response
def verify_server(self):
url = f"{self.url}/api/me"
querystring = ""
headers = {"Authorization": self.token, "Content-Type": "application/json"}
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
response = response.json()
if response.get("code") == 200:
if response.get("data").get("username") == "guest":
print(f"Alist登陆失败请检查token")
else:
print(
f"Alist登陆成功当前用户: {response.get('data').get('username')}"
)
return True
else:
print(f"Alist同步: 连接服务器失败❌ {response.get('message')}")
except requests.exceptions.RequestException as e:
print(f"获取Alist信息出错: {e}")
return False
def run(self, task, **kwargs):
if not task["addition"]["alist_sync"]["enable"]:
return 0
print(f"开始进行alist同步")
# 这一块注释的是获取任务的参数在web界面可以看
# print("所有任务参数:")
# print(task)
# print(task['addition'])
# print(task['addition']['alist_sync'])
# print(task['addition']['alist_sync']['target_path'])
# 获取夸克挂载根目录
data = self.get_storage_path(self.quark_storage_id)
if data["driver"] != "Quark":
print(
f"Alist同步: 存储{self.quark_storage_id}非夸克存储❌ {data['driver']}"
)
return 0
quark_mount_root_path = re.sub(r".*root_folder_id\":\"", "", data["addition"])
quark_mount_root_path = re.sub(r"\",.*", "", quark_mount_root_path)
if quark_mount_root_path != "0" and quark_mount_root_path != "":
print(f"Alist同步: 存储{self.quark_storage_id}挂载的目录非夸克根目录❌")
return 0
self.quark_mount_path = data["mount_path"]
# 获取保存路径的挂载根目录
if self.save_storage_id != 0:
data = self.get_storage_path(self.save_storage_id)
self.save_mount_path = data["mount_path"]
# 保存的目录初始化
if task["addition"]["alist_sync"]["save_path"] == "":
self.save_path = f"{self.save_mount_path}/{task['savepath']}"
else:
self.save_path = task["addition"]["alist_sync"]["save_path"]
if not task["addition"]["alist_sync"]["full_path_mode"]:
if self.save_path.startswith("/"):
self.save_path = self.save_path[1:]
if self.save_path.endswith("/"):
self.save_path = self.save_path[:-1]
self.save_path = f"{self.save_mount_path}/{self.save_path}"
else:
# print('完整路径模式')
if not self.save_path.startswith("/"):
self.save_path = "/" + self.save_path
if self.save_path.endswith("/"):
self.save_path = self.save_path[:-1]
# 获取保存目录是否存在
if not self.get_path(self.save_path):
dir_exists = False
# 如果目录不存在判断两边路径是否一致,一致时直接创建复制目录任务即可
else:
dir_exists = True
copy_dir = False
# 初始化验证目录
# 如果没有填验证目录,则验证目录与保存目录一致
if task["addition"]["alist_sync"]["verify_path"]:
self.verify_path = task["addition"]["alist_sync"]["verify_path"]
if not task["addition"]["alist_sync"]["full_path_mode"]:
if self.verify_path.startswith("/"):
self.verify_path = self.save_path[1:]
if self.verify_path.endswith("/"):
self.verify_path = self.save_path[:-1]
self.verify_path = f"{self.save_mount_path}/{self.verify_path}"
else:
# print('完整路径模式')
if not self.verify_path.startswith("/"):
self.verify_path = "/" + self.save_path
if self.verify_path.endswith("/"):
self.verify_path = self.save_path[:-1]
else:
self.verify_path = self.save_path
# 初始化夸克目录
self.source_path = f"{self.quark_mount_path}/{task['savepath']}"
# 初始化任务名
self.taskname = f"{task['taskname']}"
# 获取网盘已有文件
source_dir_list = self.get_path_list(self.source_path)
if not source_dir_list:
print("获取夸克文件列表失败请检查网络或手动刷新alist中的夸克目录")
return 0
if self.tv_mode == 0 or self.tv_mode == "":
self.tv_mode = False
else:
self.tv_mode = True
# 如果是新建的目录则将所有文件直接复制
if not dir_exists:
self.get_save_file([], source_dir_list)
else:
verify_dir_list = self.get_path_list(self.verify_path)
if verify_dir_list:
self.get_save_file(verify_dir_list, source_dir_list)
else:
self.get_save_file([], source_dir_list)
if self.save_file_data:
self.save_start(self.save_file_data)
print("同步的文件列表:")
for save_file in self.save_file_data:
print(f"└── 🎞️{save_file}")
else:
print("没有需要同步的文件")
def save_start(self, save_file_data):
url = f"{self.url}/api/fs/copy"
payload = json.dumps(
{
"src_dir": self.source_path,
"dst_dir": self.save_path,
"names": save_file_data,
}
)
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print("未能进行Alist同步请手动同步")
else:
print("Alist创建任务成功")
self.copy_task = response.json()
def get_save_file(self, target_dir_list, source_dir_list):
self.save_file_data = []
if target_dir_list == []:
for source_list in source_dir_list:
if self.tv_mode:
if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)",
source_list["name"],
re.IGNORECASE,
):
self.save_file_data.append(source_list["name"])
else:
self.save_file_data.append(source_list["name"])
else:
for source_list in source_dir_list:
skip = False
source_list_filename = (
source_list["name"]
.replace(".mp4", "")
.replace(".mkv", "")
.replace(self.taskname + ".", "")
.lower()
)
for target_list in target_dir_list:
if source_list["is_dir"]:
# print(f"跳过目录同步")
skip = True
break
if self.tv_mode:
target_list_filename = (
target_list["name"]
.replace(".mp4", "")
.replace(".mkv", "")
.replace(self.taskname + ".", "")
.lower()
)
if source_list_filename == target_list_filename:
# print(f"文件存在,名称为:{target_list['name']}")
skip = True
break
else:
if source_list["name"] == target_list["name"]:
# print(f"文件存在,名称为:{target_dir['name']}")
skip = True
break
if self.tv_mode:
if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.(mkv|mp4)",
source_list["name"],
re.IGNORECASE,
):
# 添加一句验证如果有MKVMP4存在时则只保存某一个格式
if re.search(
self.taskname + r"\.s\d{1,3}e\d{1,3}\.mp4",
source_list["name"],
re.IGNORECASE,
):
for all_file in source_dir_list:
if (
source_list["name"].replace(".mp4", ".mkv")
== all_file["name"]
):
print(
f"{source_list['name']}拥有相同版本的MKV文件跳过复制"
)
skip = True
if not skip:
self.save_file_data.append(source_list["name"])
def get_path_list(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps(
{"path": path, "password": "", "page": 1, "per_page": 0, "refresh": True}
)
response = self._send_request("POST", url, data=payload)
if response.status_code != 200:
print(f"获取Alist目录出错: {response}")
return False
else:
return response.json()["data"]["content"]
def get_path(self, path):
url = f"{self.url}/api/fs/list"
payload = json.dumps({"path": path, "password": "", "force_root": False})
response = self._send_request("POST", url, data=payload)
if response.status_code != 200 or response.json()["message"] != "success":
return False
else:
return True
def get_storage_path(self, storage_id):
url = f"{self.url}/api/admin/storage/get"
headers = {"Authorization": self.token}
querystring = {"id": storage_id}
try:
response = requests.request("GET", url, headers=headers, params=querystring)
response.raise_for_status()
data = response.json()
if data.get("code") == 200:
return data.get("data", [])
else:
print(f"Alist同步: 存储{storage_id}连接失败❌ {data.get('message')}")
except Exception as e:
print(f"Alist同步: 获取Alist存储出错 {e}")
return []

View File

@ -36,38 +36,29 @@ class Aria2:
if not task_config.get("auto_download"):
return
if (tree := kwargs.get("tree")) and (account := kwargs.get("account")):
# 按文件路径排序添加下载任务
nodes = sorted(
tree.all_nodes_itr(), key=lambda node: node.data.get("path", "")
)
file_fids = []
file_paths = []
for node in nodes:
for node in tree.all_nodes_itr():
if not node.data.get("is_dir", True):
file_fids.append(node.data.get("fid"))
file_paths.append(node.data.get("path"))
if not file_fids:
print(f"Aria2下载: 没有下载任务,跳过")
return
download_return, cookie = account.download(file_fids)
file_urls = [item["download_url"] for item in download_return["data"]]
for index, file_url in enumerate(file_urls):
file_path = file_paths[index]
print(f"📥 Aria2下载: {file_path}")
local_path = f"{self.dir}{file_paths[index]}"
aria2_params = [
[file_url],
{
"header": [
f"Cookie: {cookie or account.cookie}",
f"User-Agent: {account.USER_AGENT}",
],
"out": os.path.basename(local_path),
"dir": os.path.dirname(local_path),
"pause": task_config.get("pause"),
},
]
self.add_uri(aria2_params)
quark_path = node.data.get("path")
quark_fid = node.data.get("fid")
save_path = f"{self.dir}{quark_path}"
print(f"📥 Aria2下载: {quark_path}")
download_return, cookie = account.download([quark_fid])
download_url = [
item["download_url"] for item in download_return["data"]
]
aria2_params = [
download_url,
{
"header": [
f"Cookie: {cookie}",
f"User-Agent: {account.common_headers().get('user-agent')}",
],
"out": os.path.basename(save_path),
"dir": os.path.dirname(save_path),
"pause": task_config.get("pause"),
},
]
self.add_uri(aria2_params)
def _make_rpc_request(self, method, params=None):
"""发出 JSON-RPC 请求."""

View File

@ -1,312 +0,0 @@
import requests
import json
import hashlib
import random
import time
from urllib.parse import urlencode
from typing import Any, Optional
# 飞牛影视插件
# 该插件用于与飞牛影视服务器API交互支持自动刷新媒体库
# 通过配置用户名、密码和密钥字符串进行认证,并提供媒体库扫描功能
class Fnv:
# --- 配置信息 ---
default_config = {
"base_url": "http://10.0.0.6:5666", # 飞牛影视服务器URL
"app_name": "trimemedia-web", # 飞牛影视应用名称
"username": "", # 飞牛影视用户名
"password": "", # 飞牛影视密码
"secret_string": "", # 飞牛影视密钥字符串
"api_key": "", # 飞牛影视API密钥
"token": None, # 飞牛影视认证Token (可选)
}
default_task_config = {
"auto_refresh": False, # 是否自动刷新媒体库
"mdb_name": "", # 飞牛影视目标媒体库名称
"mdb_dir_list": "", # 飞牛影视目标媒体库文件夹路径列表,多个用逗号分隔
}
# 定义一个可选键的集合
OPTIONAL_KEYS = {"token"}
# --- API 端点常量 ---
API_LOGIN = "/v/api/v1/login" # 登录端点
API_MDB_LIST = "/v/api/v1/mdb/list" # 获取媒体库列表
API_MDB_SCAN = "/v/api/v1/mdb/scan/{}" # 刷新媒体库端点 ({}为媒体库ID)
API_TASK_STOP = "/v/api/v1/task/stop" # 停止任务端点
# --- 实例状态 ---
is_active = False
session = requests.Session()
token = None
# =====================================================================
# Public Methods / Entry Points (公共方法/入口)
# =====================================================================
def __init__(self, **kwargs):
"""
初始化 Fnv 客户端
"""
self.plugin_name = self.__class__.__name__.lower()
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
# 检查配置并尝试登录,以确定插件是否激活
if self._check_config():
if self.token is None or self.token == "":
self._login()
self.is_active = self.token is not None or self.token != ""
if self.is_active:
print(f"{self.plugin_name}: 插件已激活 ✅")
else:
print(f"{self.plugin_name}: 插件未激活 ❌")
def run(self, task, **kwargs):
"""
插件运行主入口
根据任务配置执行媒体库刷新操作
"""
if not self.is_active:
print(f"飞牛影视: 插件未激活,跳过任务。")
return
task_config = task.get("addition", {}).get(
self.plugin_name, self.default_task_config
)
if not task_config.get("auto_refresh"):
print("飞牛影视: 自动刷新未启用,跳过处理。")
return
target_library_name = task_config.get("mdb_name")
if not target_library_name:
print("飞牛影视: 未指定媒体库名称,跳过处理。")
return
target_library_mdb_dir_list = task_config.get("mdb_dir_list")
dir_list = []
if target_library_mdb_dir_list:
dir_list = [dir_path.strip() for dir_path in target_library_mdb_dir_list.split(",") if dir_path.strip()]
# 获取媒体库ID
library_id = self._get_library_id(target_library_name)
if library_id:
# 获取ID成功后刷新该媒体库
self._refresh_library(library_id, dir_list=dir_list)
# =====================================================================
# Internal Methods (内部实现方法)
# =====================================================================
def _check_config(self):
"""检查配置是否完整"""
missing_keys = [
key for key in self.default_config
if key not in self.OPTIONAL_KEYS and not getattr(self, key, None)
]
if missing_keys:
# print(f"{self.plugin_name} 模块缺少必要参数: {', '.join(missing_keys)}")
return False
return True
def _make_request(self, method: str, rel_url: str, params: dict = None, data: dict = None) -> Optional[Any]:
"""
一个统一的私有方法用于发送所有API请求
它会自动处理签名请求头错误和响应解析
当认证失败时会自动尝试重新登录并重试最多3次
"""
max_retries = 3
for attempt in range(max_retries):
url = f"{self.base_url.rstrip('/')}{rel_url}"
authx = self._cse_sign(method, rel_url, params, data)
if not authx:
print(f"飞牛影视: 为 {rel_url} 生成签名失败,请求中止。")
return None
headers = {
"Content-Type": "application/json",
"authx": authx,
}
if self.token:
headers["Authorization"] = self.token
try:
response = self.session.request(
method, url, headers=headers, params=params,
data=self._serialize_data(data if data is not None else {})
)
response.raise_for_status()
response_data = response.json()
except requests.exceptions.RequestException as e:
print(f"飞牛影视: 请求 {url} 时出错: {e}")
return None
except json.JSONDecodeError:
print(f"飞牛影视: 解析来自 {url} 的响应失败内容非JSON格式。")
return None
response_code = response_data.get("code")
if response_code is None:
print(f"飞牛影视: 响应格式错误,未找到 'code' 字段。")
return None
if response_code == 0:
return response_data
if response_code == -2:
print(f"飞牛影视: 认证失败 (尝试 {attempt + 1}/{max_retries}),尝试重新登录...")
if rel_url == self.API_LOGIN:
print("飞牛影视: 登录接口认证失败,请检查用户名和密码。")
return response_data
if not self._login():
print("飞牛影视: 重新登录失败,无法继续请求。")
return None
continue
else:
msg = response_data.get('msg', '未知错误')
print(f"飞牛影视: API调用失败 ({rel_url}): {msg}")
return response_data
print(f"飞牛影视: 请求 {rel_url} 在尝试 {max_retries} 次后仍然失败。")
return None
def _login(self) -> bool:
"""
登录到飞牛影视服务器并获取认证token
"""
app_name = self.app_name or self.default_config["app_name"]
username = self.username or self.default_config["username"]
password = self.password or self.default_config["password"]
print("飞牛影视: 正在尝试登录...")
payload = {"username": username, "password": password, "app_name": app_name}
response_json = self._make_request('post', self.API_LOGIN, data=payload)
if response_json and response_json.get("data", {}).get("token"):
self.token = response_json["data"]["token"]
print("飞牛影视: 登录成功 ✅")
return True
else:
print("飞牛影视: 登录失败 ❌")
return False
def _get_library_id(self, library_name: str) -> Optional[str]:
"""
根据媒体库的名称获取其唯一ID (guid)
"""
if not self.token:
print("飞牛影视: 必须先登录才能获取媒体库列表。")
return None
print(f"飞牛影视: 正在查找媒体库 '{library_name}'...")
response_json = self._make_request('get', self.API_MDB_LIST)
if response_json and response_json.get("data"):
for library in response_json.get("data", []):
if library.get("name") == library_name:
print(f"飞牛影视: 找到目标媒体库 ✅ID: {library.get('guid')}")
return library.get("guid")
print(f"飞牛影视: 未在媒体库列表中找到名为 '{library_name}' 的媒体库 ❌")
return None
def _refresh_library(self, library_id: str, dir_list: list[str] = None) -> bool:
"""
根据给定的媒体库ID触发一次媒体库扫描/刷新
"""
if not self.token:
print("飞牛影视: 必须先登录才能刷新媒体库。")
return False
if dir_list:
print(f"飞牛影视: 正在为媒体库 {library_id} 发送部分目录{dir_list}刷新指令...")
else:
print(f"飞牛影视: 正在为媒体库 {library_id} 发送刷新指令...")
rel_url = self.API_MDB_SCAN.format(library_id)
request_body = {"dir_list": dir_list} if dir_list else {}
response_json = self._make_request('post', rel_url, data=request_body)
if not response_json: return False
response_code = response_json.get("code")
if response_code == 0:
print(f"飞牛影视: 发送刷新指令成功 ✅")
return True
elif response_code == -14:
if self._stop_refresh_task(library_id):
print(f"飞牛影视: 发现重复任务,已停止旧任务,重新发送刷新指令...")
response_json = self._make_request('post', rel_url, data={})
if response_json and response_json.get("code") == 0:
print(f"飞牛影视: 发送刷新指令成功 ✅")
return True
else:
print(f"飞牛影视: 重新发送刷新指令失败 ❌")
else:
print(f"飞牛影视: 停止旧任务失败,无法继续刷新操作 ❌")
return False
def _stop_refresh_task(self, library_id: str) -> bool:
"""
停止指定的媒体库刷新任务
"""
if not self.token:
print("飞牛影视: 必须先登录才能停止刷新任务。")
return False
print(f"飞牛影视: 正在停止媒体库刷新任务 {library_id}...")
payload = {"guid": library_id, "type": "TaskItemScrap"}
response_json = self._make_request('post', self.API_TASK_STOP, data=payload)
if response_json and response_json.get("code") == 0:
print(f"飞牛影视: 停止刷新任务成功 ✅")
return True
else:
print(f"飞牛影视: 停止刷新任务失败 ❌")
return False
def _cse_sign(self, method: str, path: str, params: dict = None, data: dict = None) -> str:
"""
为API请求生成 cse 签名参数字符串
"""
nonce = str(random.randint(100000, 999999))
timestamp = str(int(time.time() * 1000))
serialized_str = ""
if method.lower() == 'get':
if params:
serialized_str = urlencode(sorted(params.items()))
else:
serialized_str = self._serialize_data(data)
body_hash = self._md5_hash(serialized_str)
string_to_sign_parts = [
self.secret_string, path, nonce, timestamp, body_hash, self.api_key
]
string_to_sign = "_".join(string_to_sign_parts)
final_sign = self._md5_hash(string_to_sign)
return f"nonce={nonce}&timestamp={timestamp}&sign={final_sign}"
# =====================================================================
# Static Utility Methods (静态工具方法)
# =====================================================================
@staticmethod
def _md5_hash(s: str) -> str:
"""计算并返回字符串的小写 MD5 哈希值。"""
return hashlib.md5(s.encode('utf-8')).hexdigest()
@staticmethod
def _serialize_data(data: Any) -> str:
"""
将请求体数据序列化为紧凑的JSON字符串
"""
if isinstance(data, dict):
return json.dumps(data, sort_keys=True, separators=(',', ':'), ensure_ascii=False)
if isinstance(data, str):
return data
if not data:
return ""
return ""

Binary file not shown.

View File

@ -1,75 +0,0 @@
import requests
class Smartstrm:
default_config = {
"webhook": "", # SmartStrm Webhook 地址
"strmtask": "", # SmartStrm 任务名,支持多个如 `tv,movie`
"xlist_path_fix": "", # 路径映射, SmartStrm 任务使用 quark 驱动时无须填写;使用 openlist 驱动时需填写 `/storage_mount_path:/quark_root_dir` ,例如把夸克根目录挂载在 OpenList 的 /quark 下,则填写 `/quark:/` ;以及 SmartStrm 会使 OpenList 强制刷新目录,无需再用 alist 插件刷新。
}
is_active = False
def __init__(self, **kwargs):
self.plugin_name = self.__class__.__name__.lower()
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
else:
print(f"{self.plugin_name} 模块缺少必要参数: {key}")
if self.webhook and self.strmtask:
if self.get_info():
self.is_active = True
def get_info(self):
"""获取 SmartStrm 信息"""
try:
response = requests.request(
"GET",
self.webhook,
timeout=5,
)
response = response.json()
if response.get("success"):
print(f"SmartStrm 触发任务: 连接成功 {response.get('version','')}")
return response
print(f"SmartStrm 触发任务:连接失败 {response.get('message','')}")
return None
except Exception as e:
print(f"SmartStrm 触发任务:连接出错 {str(e)}")
return None
def run(self, task, **kwargs):
"""
插件主入口函数
:param task: 任务配置
:param kwargs: 其他参数
"""
try:
# 准备发送的数据
headers = {"Content-Type": "application/json"}
payload = {
"event": "qas_strm",
"data": {
"strmtask": self.strmtask,
"savepath": task["savepath"],
"xlist_path_fix": self.xlist_path_fix,
},
}
# 发送 POST 请求
response = requests.request(
"POST",
self.webhook,
headers=headers,
json=payload,
timeout=5,
)
response = response.json()
if response.get("success"):
print(
f"SmartStrm 触发任务: [{response['task']['name']}] {response['task']['storage_path']} 成功✅"
)
else:
print(f"SmartStrm 触发任务: {response['message']}")
except Exception as e:
print(f"SmartStrm 触发任务:出错 {str(e)}")

File diff suppressed because it is too large Load Diff

View File

@ -6,51 +6,26 @@
"QUARK_SIGN_NOTIFY": true,
"其他推送渠道//此项可删": "配置方法同青龙"
},
"plugins": {
"media_servers": {
"emby": {
"url": "",
"token": ""
}
},
"magic_regex": {
"$TV_REGEX": {
"pattern": ".*?([Ss]\\d{1,2})?(?:[第EePpXx\\.\\-\\_\\( ]{1,2}|^)(\\d{1,3})(?!\\d).*?\\.(mp4|mkv)",
"replace": "\\1E\\2.\\3"
},
"$BLACK_WORD": {
"pattern": "^(?!.*纯享)(?!.*加更)(?!.*超前企划)(?!.*训练室)(?!.*蒸蒸日上).*",
"replace": ""
},
"$SHOW_MAGIC": {
"pattern": "^(?!.*纯享)(?!.*加更)(?!.*抢先)(?!.*预告).*?第\\d+期.*",
"replace": "{TASKNAME}.{SXX}E{II}.第{E}期{PART}.{EXT}"
},
"$TV_MAGIC": {
"pattern": ".*\\.(mp4|mkv|mov|m4v|avi|mpeg|ts)$",
"replace": "{TASKNAME}.{SXX}E{E}.{EXT}"
}
},
"tasklist": [
{
"taskname": "测试-魔法匹配剧集这是一组有效分享配置CK后可测试任务是否正常",
"shareurl": "https://pan.quark.cn/s/d07a34a9c695#/list/share/7e25ddd87cf64443b637125478733295-夸克自动转存测试",
"savepath": "/夸克自动转存测试/剧集",
"pattern": "$TV_REGEX",
"savepath": "/夸克自动转存测试",
"pattern": "$TV",
"replace": "",
"enddate": "2099-01-30",
"update_subdir": "4k|1080p"
},
{
"taskname": "测试-综艺命名",
"shareurl": "https://pan.quark.cn/s/d07a34a9c695#/list/share/7e25ddd87cf64443b637125478733295-%E5%A4%B8%E5%85%8B%E8%87%AA%E5%8A%A8%E8%BD%AC%E5%AD%98%E6%B5%8B%E8%AF%95/71df3902f42d4270a58c0eb12aa2b014-%E7%BB%BC%E8%89%BA%E5%91%BD%E5%90%8D",
"savepath": "/夸克自动转存测试/综艺命名",
"pattern": "^(?!.*纯享)(?!.*加更)(?!.*抢先)(?!.*预告).*?第\\d+期.*",
"replace": "{II}.{TASKNAME}.{DATE}.第{E}期{PART}.{EXT}"
},
{
"taskname": "测试-去广告字符",
"taskname": "测试-广告过滤",
"shareurl": "https://pan.quark.cn/s/d07a34a9c695#/list/share/7e25ddd87cf64443b637125478733295-夸克自动转存测试/680d91e490814da0927c38b432f88edc-带广告文件夹",
"savepath": "/夸克自动转存测试/去广告字符",
"savepath": "/夸克自动转存测试/带广告文件夹",
"pattern": "【XX电影网】(.*)\\.(mp4|mkv)",
"replace": "\\1.\\2",
"enddate": "2099-01-30"
@ -58,7 +33,7 @@
{
"taskname": "测试-超期任务",
"shareurl": "https://pan.quark.cn/s/d07a34a9c695#/list/share/7e25ddd87cf64443b637125478733295-夸克自动转存测试",
"savepath": "/夸克自动转存测试/不会运行",
"savepath": "/夸克自动转存测试",
"pattern": "",
"replace": "",
"enddate": "2000-01-30",

View File

@ -2,4 +2,3 @@ flask
apscheduler
requests
treelib
natsort