添加飞牛影视媒体库刷新插件 (#106)
Some checks failed
Docker Publish / build-and-push (push) Has been cancelled

- 用于与飞牛影视服务器进行对接:登录、获取媒体库列表、按名称查找目标媒体库并触发扫描刷新
- 支持自动处理 API 鉴权签名(cse 签名机制)与 Token 管理
- 支持重复任务检测,若遇到重复任务会尝试停止旧任务并重新触发
This commit is contained in:
tellbin 2025-08-15 15:45:55 +08:00 committed by GitHub
parent 759e6a451b
commit 195524f2ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

300
plugins/fnv.py Normal file
View File

@ -0,0 +1,300 @@
import requests
import json
import hashlib
import random
import time
from urllib.parse import urlencode
from typing import Any, Optional
# 飞牛影视插件
# 该插件用于与飞牛影视服务器API交互支持自动刷新媒体库
# 通过配置用户名、密码和密钥字符串进行认证,并提供媒体库扫描功能
class Fnv:
# --- 配置信息 ---
default_config = {
"base_url": "http://10.0.0.6:5666", # 飞牛影视服务器URL
"app_name": "trimemedia-web", # 飞牛影视应用名称
"username": "", # 飞牛影视用户名
"password": "", # 飞牛影视密码
"secret_string": "", # 飞牛影视密钥字符串
"api_key": "", # 飞牛影视API密钥
"token": None, # 飞牛影视认证Token (可选)
}
default_task_config = {
"auto_refresh": False, # 是否自动刷新媒体库
"mdb_name": "", # 飞牛影视目标媒体库名称
}
# 定义一个可选键的集合
OPTIONAL_KEYS = {"token"}
# --- API 端点常量 ---
API_LOGIN = "/v/api/v1/login" # 登录端点
API_MDB_LIST = "/v/api/v1/mdb/list" # 获取媒体库列表
API_MDB_SCAN = "/v/api/v1/mdb/scan/{}" # 刷新媒体库端点 ({}为媒体库ID)
API_TASK_STOP = "/v/api/v1/task/stop" # 停止任务端点
# --- 实例状态 ---
is_active = False
session = requests.Session()
token = None
# =====================================================================
# Public Methods / Entry Points (公共方法/入口)
# =====================================================================
def __init__(self, **kwargs):
"""
初始化 Fnv 客户端
"""
self.plugin_name = self.__class__.__name__.lower()
if kwargs:
for key, _ in self.default_config.items():
if key in kwargs:
setattr(self, key, kwargs[key])
# 检查配置并尝试登录,以确定插件是否激活
if self._check_config():
if self.token is None or self.token == "":
self._login()
self.is_active = self.token is not None or self.token != ""
if self.is_active:
print(f"{self.plugin_name}: 插件已激活 ✅")
else:
print(f"{self.plugin_name}: 插件未激活 ❌")
def run(self, task, **kwargs):
"""
插件运行主入口
根据任务配置执行媒体库刷新操作
"""
if not self.is_active:
print(f"飞牛影视: 插件未激活,跳过任务。")
return
task_config = task.get("addition", {}).get(
self.plugin_name, self.default_task_config
)
if not task_config.get("auto_refresh"):
print("飞牛影视: 自动刷新未启用,跳过处理。")
return
target_library_name = task_config.get("mdb_name")
if not target_library_name:
print("飞牛影视: 未指定媒体库名称,跳过处理。")
return
# 获取媒体库ID
library_id = self._get_library_id(target_library_name)
if library_id:
# 获取ID成功后刷新该媒体库
self._refresh_library(library_id)
# =====================================================================
# Internal Methods (内部实现方法)
# =====================================================================
def _check_config(self):
"""检查配置是否完整"""
missing_keys = [
key for key in self.default_config
if key not in self.OPTIONAL_KEYS and not getattr(self, key, None)
]
if missing_keys:
print(f"{self.plugin_name} 模块缺少必要参数: {', '.join(missing_keys)}")
return False
return True
def _make_request(self, method: str, rel_url: str, params: dict = None, data: dict = None) -> Optional[Any]:
"""
一个统一的私有方法用于发送所有API请求
它会自动处理签名请求头错误和响应解析
当认证失败时会自动尝试重新登录并重试最多3次
"""
max_retries = 3
for attempt in range(max_retries):
url = f"{self.base_url.rstrip('/')}{rel_url}"
authx = self._cse_sign(method, rel_url, params, data)
if not authx:
print(f"飞牛影视: 为 {rel_url} 生成签名失败,请求中止。")
return None
headers = {
"Content-Type": "application/json",
"authx": authx,
}
if self.token:
headers["Authorization"] = self.token
try:
response = self.session.request(
method, url, headers=headers, params=params, json=data if data is not None else {}
)
response.raise_for_status()
response_data = response.json()
except requests.exceptions.RequestException as e:
print(f"飞牛影视: 请求 {url} 时出错: {e}")
return None
except json.JSONDecodeError:
print(f"飞牛影视: 解析来自 {url} 的响应失败内容非JSON格式。")
return None
response_code = response_data.get("code")
if response_code is None:
print(f"飞牛影视: 响应格式错误,未找到 'code' 字段。")
return None
if response_code == 0:
return response_data
if response_code == -2:
print(f"飞牛影视: 认证失败 (尝试 {attempt + 1}/{max_retries}),尝试重新登录...")
if rel_url == self.API_LOGIN:
print("飞牛影视: 登录接口认证失败,请检查用户名和密码。")
return response_data
if not self._login():
print("飞牛影视: 重新登录失败,无法继续请求。")
return None
continue
else:
msg = response_data.get('msg', '未知错误')
print(f"飞牛影视: API调用失败 ({rel_url}): {msg}")
return response_data
print(f"飞牛影视: 请求 {rel_url} 在尝试 {max_retries} 次后仍然失败。")
return None
def _login(self) -> bool:
"""
登录到飞牛影视服务器并获取认证token
"""
app_name = self.app_name or self.default_config["app_name"]
username = self.username or self.default_config["username"]
password = self.password or self.default_config["password"]
print("飞牛影视: 正在尝试登录...")
payload = {"username": username, "password": password, "app_name": app_name}
response_json = self._make_request('post', self.API_LOGIN, data=payload)
if response_json and response_json.get("data", {}).get("token"):
self.token = response_json["data"]["token"]
print("飞牛影视: 登录成功 ✅")
return True
else:
print("飞牛影视: 登录失败 ❌")
return False
def _get_library_id(self, library_name: str) -> Optional[str]:
"""
根据媒体库的名称获取其唯一ID (guid)
"""
if not self.token:
print("飞牛影视: 必须先登录才能获取媒体库列表。")
return None
print(f"飞牛影视: 正在查找媒体库 '{library_name}'...")
response_json = self._make_request('get', self.API_MDB_LIST)
if response_json and response_json.get("data"):
for library in response_json.get("data", []):
if library.get("name") == library_name:
print(f"飞牛影视: 找到目标媒体库 ✅ID: {library.get('guid')}")
return library.get("guid")
print(f"飞牛影视: 未在媒体库列表中找到名为 '{library_name}' 的媒体库 ❌")
return None
def _refresh_library(self, library_id: str) -> bool:
"""
根据给定的媒体库ID触发一次媒体库扫描/刷新
"""
if not self.token:
print("飞牛影视: 必须先登录才能刷新媒体库。")
return False
print(f"飞牛影视: 正在为媒体库 {library_id} 发送刷新指令...")
rel_url = self.API_MDB_SCAN.format(library_id)
response_json = self._make_request('post', rel_url, data={})
if not response_json: return False
response_code = response_json.get("code")
if response_code == 0:
print(f"飞牛影视: 发送刷新指令成功 ✅")
return True
elif response_code == -14:
if self._stop_refresh_task(library_id):
print(f"飞牛影视: 发现重复任务,已停止旧任务,重新发送刷新指令...")
response_json = self._make_request('post', rel_url, data={})
if response_json and response_json.get("code") == 0:
print(f"飞牛影视: 发送刷新指令成功 ✅")
return True
else:
print(f"飞牛影视: 重新发送刷新指令失败 ❌")
else:
print(f"飞牛影视: 停止旧任务失败,无法继续刷新操作 ❌")
return False
def _stop_refresh_task(self, library_id: str) -> bool:
"""
停止指定的媒体库刷新任务
"""
if not self.token:
print("飞牛影视: 必须先登录才能停止刷新任务。")
return False
print(f"飞牛影视: 正在停止媒体库刷新任务 {library_id}...")
payload = {"guid": library_id, "type": "TaskItemScrap"}
response_json = self._make_request('post', self.API_TASK_STOP, data=payload)
if response_json and response_json.get("code") == 0:
print(f"飞牛影视: 停止刷新任务成功 ✅")
return True
else:
print(f"飞牛影视: 停止刷新任务失败 ❌")
return False
def _cse_sign(self, method: str, path: str, params: dict = None, data: dict = None) -> str:
"""
为API请求生成 cse 签名参数字符串
"""
nonce = str(random.randint(100000, 999999))
timestamp = str(int(time.time() * 1000))
if method.lower() == 'get' and params:
serialized_str = urlencode(sorted(params.items()))
else:
serialized_str = self._serialize_data(data)
body_hash = self._md5_hash(serialized_str)
string_to_sign_parts = [
self.secret_string, path, nonce, timestamp, body_hash, self.api_key
]
string_to_sign = "_".join(string_to_sign_parts)
final_sign = self._md5_hash(string_to_sign)
return f"nonce={nonce}&timestamp={timestamp}&sign={final_sign}"
# =====================================================================
# Static Utility Methods (静态工具方法)
# =====================================================================
@staticmethod
def _md5_hash(s: str) -> str:
"""计算并返回字符串的小写 MD5 哈希值。"""
return hashlib.md5(s.encode('utf-8')).hexdigest()
@staticmethod
def _serialize_data(data: Any) -> str:
"""
将请求体数据序列化为紧凑的JSON字符串
"""
if isinstance(data, dict):
return json.dumps(data, sort_keys=True, separators=(',', ':'))
if isinstance(data, str):
return data
if not data:
return ""
return ""