启动时自动激活:插件在初始化时会立即检查配置并尝试登录,根据结果设置激活状态

Token 过期自动重登:在API请求因认证失败(code: -2)时,插件会自动尝试重新登录并重试请求(最多3次),极大地提高了长时间运行的稳定性。
支持 Token 配置:新增可选的 token 参数,允许用户直接提供有效 token,跳过用户名/密码登录流程。
健壮的 API 请求逻辑:
代码清晰分区:通过注释将代码划分为配置、公共方法、内部实现等区域,提升可读性。
API 端点常量化:将API路径定义为类常量(如 API_LOGIN),便于维护和管理。
This commit is contained in:
tellbin 2025-08-15 14:47:44 +08:00
parent 6844eb544b
commit c596d62962

View File

@ -3,11 +3,15 @@ import json
import hashlib import hashlib
import random import random
import time import time
from urllib.parse import urlencode, urlsplit from urllib.parse import urlencode
from typing import Any, Optional from typing import Any, Optional
# 飞牛影视插件
# 该插件用于与飞牛影视服务器API交互支持自动刷新媒体库
# 通过配置用户名、密码和密钥字符串进行认证,并提供媒体库扫描功能
class Fnv: class Fnv:
# --- 配置信息 ---
default_config = { default_config = {
"base_url": "http://10.0.0.6:5666", # 飞牛影视服务器URL "base_url": "http://10.0.0.6:5666", # 飞牛影视服务器URL
"app_name": "trimemedia-web", # 飞牛影视应用名称 "app_name": "trimemedia-web", # 飞牛影视应用名称
@ -15,6 +19,7 @@ class Fnv:
"password": "", # 飞牛影视密码 "password": "", # 飞牛影视密码
"secret_string": "", # 飞牛影视密钥字符串 "secret_string": "", # 飞牛影视密钥字符串
"api_key": "", # 飞牛影视API密钥 "api_key": "", # 飞牛影视API密钥
"token": None, # 飞牛影视认证Token (可选)
} }
default_task_config = { default_task_config = {
@ -22,118 +27,159 @@ class Fnv:
"mdb_name": "", # 飞牛影视目标媒体库名称 "mdb_name": "", # 飞牛影视目标媒体库名称
} }
# 定义一个可选键的集合
OPTIONAL_KEYS = {"token"}
# --- API 端点常量 ---
API_LOGIN = "/v/api/v1/login" # 登录端点
API_MDB_LIST = "/v/api/v1/mdb/list" # 获取媒体库列表
API_MDB_SCAN = "/v/api/v1/mdb/scan/{}" # 刷新媒体库端点 ({}为媒体库ID)
API_TASK_STOP = "/v/api/v1/task/stop" # 停止任务端点
# --- 实例状态 ---
is_active = False is_active = False
token = None
session = requests.Session() session = requests.Session()
token = None
# =====================================================================
# Public Methods / Entry Points (公共方法/入口)
# =====================================================================
def __init__(self, **kwargs): def __init__(self, **kwargs):
"""
初始化 Fnv 客户端
"""
self.plugin_name = self.__class__.__name__.lower() self.plugin_name = self.__class__.__name__.lower()
if kwargs: if kwargs:
for key, _ in self.default_config.items(): for key, _ in self.default_config.items():
if key in kwargs: if key in kwargs:
setattr(self, key, kwargs[key]) setattr(self, key, kwargs[key])
self.is_active = self._check_config() # 检查配置并尝试登录,以确定插件是否激活
if self._check_config():
if self.token is None or self.token == "":
self._login()
self.is_active = self.token is not None or self.token != ""
if self.is_active:
print(f"{self.plugin_name}: 插件已激活 ✅")
else:
print(f"{self.plugin_name}: 插件未激活 ❌")
def run(self, task, **kwargs):
"""
插件运行主入口
根据任务配置执行媒体库刷新操作
"""
if not self.is_active:
print(f"飞牛影视: 插件未激活,跳过任务。")
return
task_config = task.get("addition", {}).get(
self.plugin_name, self.default_task_config
)
if not task_config.get("auto_refresh"):
print("飞牛影视: 自动刷新未启用,跳过处理。")
return
target_library_name = task_config.get("mdb_name")
if not target_library_name:
print("飞牛影视: 未指定媒体库名称,跳过处理。")
return
# 获取媒体库ID
library_id = self._get_library_id(target_library_name)
if library_id:
# 获取ID成功后刷新该媒体库
self._refresh_library(library_id)
# =====================================================================
# Internal Methods (内部实现方法)
# =====================================================================
def _check_config(self): def _check_config(self):
"""检查配置是否完整""" """检查配置是否完整"""
missing_keys = [ missing_keys = [
key for key in self.default_config if not getattr(self, key, None) key for key in self.default_config
if key not in self.OPTIONAL_KEYS and not getattr(self, key, None)
] ]
if missing_keys: if missing_keys:
print(f"{self.plugin_name} 模块缺少必要参数: {', '.join(missing_keys)}") print(f"{self.plugin_name} 模块缺少必要参数: {', '.join(missing_keys)}")
return False return False
return True return True
def run(self, task, **kwargs):
"""插件运行主入口"""
if not self.is_active:
return
task_config = task.get("addition", {}).get(
self.plugin_name, self.default_task_config
)
if not task_config.get("auto_refresh"):
print("飞牛影视: 自动刷新未启用,跳过处理")
return
if not task_config.get("mdb_name"):
print("飞牛影视: 未指定媒体库名称,跳过处理")
return
target_library_name = task_config["mdb_name"]
app_name = self.app_name or self.default_config["app_name"]
username = self.username or self.default_config["username"]
password = self.password or self.default_config["password"]
# 登录并执行后续操作
if self._login(username=username, password=password, app_name=app_name):
# 登录成功后获取媒体库ID
library_id = self._get_library_id(target_library_name)
if library_id:
# 获取ID成功后刷新该媒体库
self._refresh_library(library_id)
def _make_request(self, method: str, rel_url: str, params: dict = None, data: dict = None) -> Optional[Any]: def _make_request(self, method: str, rel_url: str, params: dict = None, data: dict = None) -> Optional[Any]:
""" """
一个统一的私有方法用于发送所有API请求 一个统一的私有方法用于发送所有API请求
它会自动处理签名请求头错误和响应解析 它会自动处理签名请求头错误和响应解析
当认证失败时会自动尝试重新登录并重试最多3次
""" """
url = f"{self.base_url.rstrip('/')}{rel_url}" max_retries = 3
for attempt in range(max_retries):
url = f"{self.base_url.rstrip('/')}{rel_url}"
# 1. 生成认证签名 authx = self._cse_sign(method, rel_url, params, data)
authx = self._cse_sign(method, rel_url, params, data) if not authx:
if not authx: print(f"飞牛影视: 为 {rel_url} 生成签名失败,请求中止。")
print(f"飞牛影视: 为 {rel_url} 生成签名失败,请求中止。") return None
return None
# 2. 准备请求头 headers = {
headers = { "Content-Type": "application/json",
"Content-Type": "application/json", "authx": authx,
"authx": authx, }
} if self.token:
if self.token: headers["Authorization"] = self.token
headers["Authorization"] = self.token
# 3. 发送请求并处理异常 try:
try: response = self.session.request(
response = self.session.request( method, url, headers=headers, params=params, json=data if data is not None else {}
method, )
url, response.raise_for_status()
headers=headers, response_data = response.json()
params=params, except requests.exceptions.RequestException as e:
json=data if data is not None else {} print(f"飞牛影视: 请求 {url} 时出错: {e}")
) return None
response.raise_for_status() # 对 4xx/5xx 响应抛出异常 except json.JSONDecodeError:
response_data = response.json() print(f"飞牛影视: 解析来自 {url} 的响应失败内容非JSON格式。")
except requests.exceptions.RequestException as e: return None
print(f"飞牛影视: 请求 {url} 时出错: {e}")
return None
except json.JSONDecodeError:
print(f"飞牛影视: 解析来自 {url} 的响应失败内容非JSON格式。")
return None
# 4. 统一处理API层面的错误 response_code = response_data.get("code")
if response_data.get("code") != 0: if response_code is None:
msg = response_data.get('msg', '未知错误') print(f"飞牛影视: 响应格式错误,未找到 'code' 字段。")
print(f"飞牛影视: API调用失败 ({rel_url}): {msg}") return None
# 5. 返回响应 if response_code == 0:
return response_data return response_data
def _login(self, username: str, password: str, app_name: str) -> bool: if response_code == -2:
print(f"飞牛影视: 认证失败 (尝试 {attempt + 1}/{max_retries}),尝试重新登录...")
if rel_url == self.API_LOGIN:
print("飞牛影视: 登录接口认证失败,请检查用户名和密码。")
return response_data
if not self._login():
print("飞牛影视: 重新登录失败,无法继续请求。")
return None
continue
else:
msg = response_data.get('msg', '未知错误')
print(f"飞牛影视: API调用失败 ({rel_url}): {msg}")
return response_data
print(f"飞牛影视: 请求 {rel_url} 在尝试 {max_retries} 次后仍然失败。")
return None
def _login(self) -> bool:
""" """
登录到飞牛影视服务器并获取认证token 登录到飞牛影视服务器并获取认证token
成功后token会自动存储在客户端实例中
""" """
app_name = self.app_name or self.default_config["app_name"]
username = self.username or self.default_config["username"]
password = self.password or self.default_config["password"]
print("飞牛影视: 正在尝试登录...") print("飞牛影视: 正在尝试登录...")
rel_url = "/v/api/v1/login"
payload = {
"username": username,
"password": password,
"app_name": app_name,
}
response_json = self._make_request('post', rel_url, data=payload)
if response_json and response_json["data"]["token"]: payload = {"username": username, "password": password, "app_name": app_name}
response_json = self._make_request('post', self.API_LOGIN, data=payload)
if response_json and response_json.get("data", {}).get("token"):
self.token = response_json["data"]["token"] self.token = response_json["data"]["token"]
print("飞牛影视: 登录成功 ✅") print("飞牛影视: 登录成功 ✅")
return True return True
@ -150,12 +196,10 @@ class Fnv:
return None return None
print(f"飞牛影视: 正在查找媒体库 '{library_name}'...") print(f"飞牛影视: 正在查找媒体库 '{library_name}'...")
rel_url = "/v/api/v1/mdb/list" response_json = self._make_request('get', self.API_MDB_LIST)
response_json = self._make_request('get', rel_url)
if response_json and response_json.get("data"): if response_json and response_json.get("data"):
libraries = response_json.get("data", []) for library in response_json.get("data", []):
for library in libraries:
if library.get("name") == library_name: if library.get("name") == library_name:
print(f"飞牛影视: 找到目标媒体库 ✅ID: {library.get('guid')}") print(f"飞牛影视: 找到目标媒体库 ✅ID: {library.get('guid')}")
return library.get("guid") return library.get("guid")
@ -171,18 +215,20 @@ class Fnv:
return False return False
print(f"飞牛影视: 正在为媒体库 {library_id} 发送刷新指令...") print(f"飞牛影视: 正在为媒体库 {library_id} 发送刷新指令...")
rel_url = f"/v/api/v1/mdb/scan/{library_id}" rel_url = self.API_MDB_SCAN.format(library_id)
response_json = self._make_request('post', rel_url, data={}) # POST一个空对象 response_json = self._make_request('post', rel_url, data={})
if not response_json: return False
response_code = response_json.get("code") response_code = response_json.get("code")
if 0 == response_code: if response_code == 0:
print(f"飞牛影视: 发送刷新指令成功 ✅") print(f"飞牛影视: 发送刷新指令成功 ✅")
return True return True
elif response_code == -14: elif response_code == -14:
if self._stop_refresh_task(library_id): if self._stop_refresh_task(library_id):
print(f"飞牛影视: 发现重复任务,已停止旧任务,重新发送刷新指令...") print(f"飞牛影视: 发现重复任务,已停止旧任务,重新发送刷新指令...")
response_json = self._make_request('post', rel_url, data={}) # POST一个空对象 response_json = self._make_request('post', rel_url, data={})
response_code = response_json.get("code") if response_json and response_json.get("code") == 0:
if 0 == response_code:
print(f"飞牛影视: 发送刷新指令成功 ✅") print(f"飞牛影视: 发送刷新指令成功 ✅")
return True return True
else: else:
@ -200,13 +246,10 @@ class Fnv:
return False return False
print(f"飞牛影视: 正在停止媒体库刷新任务 {library_id}...") print(f"飞牛影视: 正在停止媒体库刷新任务 {library_id}...")
rel_url = f"/v/api/v1/task/stop" payload = {"guid": library_id, "type": "TaskItemScrap"}
response_json = self._make_request('post', rel_url, data={ response_json = self._make_request('post', self.API_TASK_STOP, data=payload)
"guid": library_id,
"type": "TaskItemScrap" if response_json and response_json.get("code") == 0:
})
response_code = response_json.get("code")
if 0 == response_code:
print(f"飞牛影视: 停止刷新任务成功 ✅") print(f"飞牛影视: 停止刷新任务成功 ✅")
return True return True
else: else:
@ -223,23 +266,21 @@ class Fnv:
if method.lower() == 'get' and params: if method.lower() == 'get' and params:
serialized_str = urlencode(sorted(params.items())) serialized_str = urlencode(sorted(params.items()))
else: else:
# 对于POST/PUT等方法序列化body
serialized_str = self._serialize_data(data) serialized_str = self._serialize_data(data)
body_hash = self._md5_hash(serialized_str) body_hash = self._md5_hash(serialized_str)
string_to_sign_parts = [ string_to_sign_parts = [
self.secret_string, self.secret_string, path, nonce, timestamp, body_hash, self.api_key
path,
nonce,
timestamp,
body_hash,
self.api_key
] ]
string_to_sign = "_".join(string_to_sign_parts) string_to_sign = "_".join(string_to_sign_parts)
final_sign = self._md5_hash(string_to_sign) final_sign = self._md5_hash(string_to_sign)
return f"nonce={nonce}&timestamp={timestamp}&sign={final_sign}" return f"nonce={nonce}&timestamp={timestamp}&sign={final_sign}"
# =====================================================================
# Static Utility Methods (静态工具方法)
# =====================================================================
@staticmethod @staticmethod
def _md5_hash(s: str) -> str: def _md5_hash(s: str) -> str:
"""计算并返回字符串的小写 MD5 哈希值。""" """计算并返回字符串的小写 MD5 哈希值。"""
@ -249,18 +290,11 @@ class Fnv:
def _serialize_data(data: Any) -> str: def _serialize_data(data: Any) -> str:
""" """
将请求体数据序列化为紧凑的JSON字符串 将请求体数据序列化为紧凑的JSON字符串
- 字典 (包括`{}`) 会被正确序列化为JSON字符串 (例如 `"{}"`)
- 字符串会原样返回
- 其他`None`或falsy值会返回空字符串`""`
""" """
# 优先检查dict因为空字典`{}`在Python中为falsy。
# 这样可以确保 `{}` 被正确序列化为字符串 `"{}"`。
if isinstance(data, dict): if isinstance(data, dict):
return json.dumps(data, sort_keys=True, separators=(',', ':')) return json.dumps(data, sort_keys=True, separators=(',', ':'))
if isinstance(data, str): if isinstance(data, str):
return data return data
# 对于其他 falsy 值(如 None, []),返回空字符串。
if not data: if not data:
return "" return ""
# 其他情况(虽然少见),返回空字符串以保证安全。
return "" return ""