import os import re import requests class Smartstrm: default_config = { "webhook": "", # SmartStrm Webhook 地址 "taskname": "", # SmartStrm 任务名 "fix_path": "", # 路径映射, SmartStrm 任务使用 quark 驱动时无须填写;使用 openlist 驱动时需填写 `/storage_mount_path:/quark_root_dir` ,例如我把夸克根目录挂载在 OpenList 的 /quark 下,则填写 `/quark:/` ;以及 SmartStrm 会使 OpenList 强制刷新目录,无需再用 alist 插件刷新。 } default_task_config = {} is_active = False def __init__(self, **kwargs): self.plugin_name = self.__class__.__name__.lower() if kwargs: for key, _ in self.default_config.items(): if key in kwargs: setattr(self, key, kwargs[key]) else: print(f"{self.plugin_name} 模块缺少必要参数: {key}") if self.webhook and self.taskname: print(f"SmartStrm 触发任务: {self.taskname} ") self.is_active = True def run(self, task, **kwargs): """ 插件主入口函数 :param task: 任务配置 :param kwargs: 其他参数 """ if match := re.match(r"^(\/[^:]*):(\/[^:]*)$", self.fix_path): # 存储挂载路径, 夸克根文件夹 storage_mount_path, quark_root_dir = match.group(1), match.group(2) storage_path = os.path.join( storage_mount_path, task["savepath"].replace(quark_root_dir, "", 1).lstrip("/"), ).replace("\\", "/") else: storage_path = task["savepath"] try: # 准备发送的数据 headers = {"Content-Type": "application/json"} payload = { "event": "a_task", "task": { "name": self.taskname, "storage_path": storage_path, }, } # 发送 POST 请求 response = requests.request( "POST", self.webhook, headers=headers, json=payload, timeout=5, ) # 检查响应状态 if response.status_code == 200: response = response.json() if response.get("success"): print( f"SmartStrm 触发任务: {response['task']['name']}{response['task']['storage_path']} 成功✅" ) else: print(f"SmartStrm 触发任务: {response['message']}") else: print(f"SmartStrm 触发任务: {response.status_code}") except Exception as e: print(f"SmartStrm 触发任务:出错 {e}")