diff --git a/plugins/fnv.py b/plugins/fnv.py new file mode 100644 index 0000000..bb08e30 --- /dev/null +++ b/plugins/fnv.py @@ -0,0 +1,266 @@ +import requests +import json +import hashlib +import random +import time +from urllib.parse import urlencode, urlsplit +from typing import Any, Optional + + +class Fnv: + default_config = { + "base_url": "http://10.0.0.6:5666", # 飞牛影视服务器URL + "app_name": "trimemedia-web", # 飞牛影视应用名称 + "username": "", # 飞牛影视用户名 + "password": "", # 飞牛影视密码 + "secret_string": "", # 飞牛影视密钥字符串 + "api_key": "", # 飞牛影视API密钥 + } + + default_task_config = { + "auto_refresh": False, # 是否自动刷新媒体库 + "mdb_name": "", # 飞牛影视目标媒体库名称 + } + + is_active = False + token = None + session = requests.Session() + + def __init__(self, **kwargs): + self.plugin_name = self.__class__.__name__.lower() + if kwargs: + for key, _ in self.default_config.items(): + if key in kwargs: + setattr(self, key, kwargs[key]) + self.is_active = self._check_config() + + def _check_config(self): + """检查配置是否完整""" + missing_keys = [ + key for key in self.default_config if not getattr(self, key, None) + ] + if missing_keys: + print(f"{self.plugin_name} 模块缺少必要参数: {', '.join(missing_keys)}") + return False + return True + + def run(self, task, **kwargs): + """插件运行主入口""" + if not self.is_active: + return + task_config = task.get("addition", {}).get( + self.plugin_name, self.default_task_config + ) + if not task_config.get("auto_refresh"): + print("飞牛影视: 自动刷新未启用,跳过处理") + return + if not task_config.get("mdb_name"): + print("飞牛影视: 未指定媒体库名称,跳过处理") + return + target_library_name = task_config["mdb_name"] + app_name = self.app_name or self.default_config["app_name"] + username = self.username or self.default_config["username"] + password = self.password or self.default_config["password"] + + # 登录并执行后续操作 + if self._login(username=username, password=password, app_name=app_name): + + # 登录成功后,获取媒体库ID + library_id = self._get_library_id(target_library_name) + + if library_id: + # 获取ID成功后,刷新该媒体库 + self._refresh_library(library_id) + + def _make_request(self, method: str, rel_url: str, params: dict = None, data: dict = None) -> Optional[Any]: + """ + 一个统一的私有方法,用于发送所有API请求。 + 它会自动处理签名、请求头、错误和响应解析。 + """ + url = f"{self.base_url.rstrip('/')}{rel_url}" + + # 1. 生成认证签名 + authx = self._cse_sign(method, rel_url, params, data) + if not authx: + print(f"飞牛影视: 为 {rel_url} 生成签名失败,请求中止。") + return None + + # 2. 准备请求头 + headers = { + "Content-Type": "application/json", + "authx": authx, + } + if self.token: + headers["Authorization"] = self.token + + # 3. 发送请求并处理异常 + try: + response = self.session.request( + method, + url, + headers=headers, + params=params, + json=data if data is not None else {} + ) + response.raise_for_status() # 对 4xx/5xx 响应抛出异常 + response_data = response.json() + except requests.exceptions.RequestException as e: + print(f"飞牛影视: 请求 {url} 时出错: {e}") + return None + except json.JSONDecodeError: + print(f"飞牛影视: 解析来自 {url} 的响应失败,内容非JSON格式。") + return None + + # 4. 统一处理API层面的错误 + if response_data.get("code") != 0: + msg = response_data.get('msg', '未知错误') + print(f"飞牛影视: API调用失败 ({rel_url}): {msg}") + + # 5. 返回响应 + return response_data + + def _login(self, username: str, password: str, app_name: str) -> bool: + """ + 登录到飞牛影视服务器并获取认证token。 + 成功后,token会自动存储在客户端实例中。 + """ + print("飞牛影视: 正在尝试登录...") + rel_url = "/v/api/v1/login" + payload = { + "username": username, + "password": password, + "app_name": app_name, + } + response_json = self._make_request('post', rel_url, data=payload) + + if response_json and response_json["data"]["token"]: + self.token = response_json["data"]["token"] + print("飞牛影视: 登录成功 ✅") + return True + else: + print("飞牛影视: 登录失败 ❌") + return False + + def _get_library_id(self, library_name: str) -> Optional[str]: + """ + 根据媒体库的名称获取其唯一ID (guid)。 + """ + if not self.token: + print("飞牛影视: 必须先登录才能获取媒体库列表。") + return None + + print(f"飞牛影视: 正在查找媒体库 '{library_name}'...") + rel_url = "/v/api/v1/mdb/list" + response_json = self._make_request('get', rel_url) + + if response_json and response_json.get("data"): + libraries = response_json.get("data", []) + for library in libraries: + if library.get("name") == library_name: + print(f"飞牛影视: 找到目标媒体库 ✅,ID: {library.get('guid')}") + return library.get("guid") + print(f"飞牛影视: 未在媒体库列表中找到名为 '{library_name}' 的媒体库 ❌") + return None + + def _refresh_library(self, library_id: str) -> bool: + """ + 根据给定的媒体库ID触发一次媒体库扫描/刷新。 + """ + if not self.token: + print("飞牛影视: 必须先登录才能刷新媒体库。") + return False + + print(f"飞牛影视: 正在为媒体库 {library_id} 发送刷新指令...") + rel_url = f"/v/api/v1/mdb/scan/{library_id}" + response_json = self._make_request('post', rel_url, data={}) # POST一个空对象 + response_code = response_json.get("code") + if 0 == response_code: + print(f"飞牛影视: 发送刷新指令成功 ✅") + return True + elif response_code == -14: + if self._stop_refresh_task(library_id): + print(f"飞牛影视: 发现重复任务,已停止旧任务,重新发送刷新指令...") + response_json = self._make_request('post', rel_url, data={}) # POST一个空对象 + response_code = response_json.get("code") + if 0 == response_code: + print(f"飞牛影视: 发送刷新指令成功 ✅") + return True + else: + print(f"飞牛影视: 重新发送刷新指令失败 ❌") + else: + print(f"飞牛影视: 停止旧任务失败,无法继续刷新操作 ❌") + return False + + def _stop_refresh_task(self, library_id: str) -> bool: + """ + 停止指定的媒体库刷新任务。 + """ + if not self.token: + print("飞牛影视: 必须先登录才能停止刷新任务。") + return False + + print(f"飞牛影视: 正在停止媒体库刷新任务 {library_id}...") + rel_url = f"/v/api/v1/task/stop" + response_json = self._make_request('post', rel_url, data={ + "guid": library_id, + "type": "TaskItemScrap" + }) + response_code = response_json.get("code") + if 0 == response_code: + print(f"飞牛影视: 停止刷新任务成功 ✅") + return True + else: + print(f"飞牛影视: 停止刷新任务失败 ❌") + return False + + def _cse_sign(self, method: str, path: str, params: dict = None, data: dict = None) -> str: + """ + 为API请求生成 cse 签名参数字符串。 + """ + nonce = str(random.randint(100000, 999999)) + timestamp = str(int(time.time() * 1000)) + + if method.lower() == 'get' and params: + serialized_str = urlencode(sorted(params.items())) + else: + # 对于POST/PUT等方法,序列化body + serialized_str = self._serialize_data(data) + body_hash = self._md5_hash(serialized_str) + + string_to_sign_parts = [ + self.secret_string, + path, + nonce, + timestamp, + body_hash, + self.api_key + ] + string_to_sign = "_".join(string_to_sign_parts) + final_sign = self._md5_hash(string_to_sign) + + return f"nonce={nonce}×tamp={timestamp}&sign={final_sign}" + + @staticmethod + def _md5_hash(s: str) -> str: + """计算并返回字符串的小写 MD5 哈希值。""" + return hashlib.md5(s.encode('utf-8')).hexdigest() + + @staticmethod + def _serialize_data(data: Any) -> str: + """ + 将请求体数据序列化为紧凑的JSON字符串。 + - 字典 (包括`{}`) 会被正确序列化为JSON字符串 (例如 `"{}"`)。 + - 字符串会原样返回。 + - 其他`None`或falsy值会返回空字符串`""`。 + """ + # 优先检查dict,因为空字典`{}`在Python中为falsy。 + # 这样可以确保 `{}` 被正确序列化为字符串 `"{}"`。 + if isinstance(data, dict): + return json.dumps(data, sort_keys=True, separators=(',', ':')) + if isinstance(data, str): + return data + # 对于其他 falsy 值(如 None, []),返回空字符串。 + if not data: + return "" + # 其他情况(虽然少见),返回空字符串以保证安全。 + return ""