♻️ 重构媒体库代码,模块化以便扩展

- 通过load_media_servers函数动态加载媒体库模块
- 动态加载媒体库配置,提高扩展性
- 修改部分配置字段,并添加兼容升级代码
This commit is contained in:
Cp0204 2024-11-13 01:28:37 +08:00
parent abc5ef4e40
commit 9a1ebe0894
2 changed files with 117 additions and 86 deletions

76
media_servers/emby.py Normal file
View File

@ -0,0 +1,76 @@
import requests
class Emby:
def __init__(self, **kwargs):
self.is_active = False
if kwargs.get("url") and kwargs.get("apikey"):
self.emby_url = kwargs.get("url")
self.emby_apikey = kwargs.get("apikey")
if self.get_info():
self.is_active = True
def get_info(self):
url = f"{self.emby_url}/emby/System/Info"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {}
response = requests.request("GET", url, headers=headers, params=querystring)
if "application/json" in response.headers["Content-Type"]:
response = response.json()
print(
f"Emby媒体库: {response.get('ServerName','')} v{response.get('Version','')}"
)
return True
else:
print(f"Emby媒体库: 连接失败❌ {response.text}")
return False
def refresh(self, emby_id):
if emby_id:
url = f"{self.emby_url}/emby/Items/{emby_id}/Refresh"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {
"Recursive": "true",
"MetadataRefreshMode": "FullRefresh",
"ImageRefreshMode": "FullRefresh",
"ReplaceAllMetadata": "false",
"ReplaceAllImages": "false",
}
response = requests.request(
"POST", url, headers=headers, params=querystring
)
if response.text == "":
print(f"🎞 刷新Emby媒体库成功✅")
return True
else:
print(f"🎞 刷新Emby媒体库{response.text}")
return False
def search(self, media_name):
if media_name:
url = f"{self.emby_url}/emby/Items"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {
"IncludeItemTypes": "Series",
"StartIndex": 0,
"SortBy": "SortName",
"SortOrder": "Ascending",
"ImageTypeLimit": 0,
"Recursive": "true",
"SearchTerm": media_name,
"Limit": 10,
"IncludeSearchTypes": "false",
}
response = requests.request("GET", url, headers=headers, params=querystring)
if "application/json" in response.headers["Content-Type"]:
response = response.json()
if response.get("Items"):
for item in response["Items"]:
if item["IsFolder"]:
print(
f"🎞 《{item['Name']}》匹配到Emby媒体库ID{item['Id']}"
)
return item["Id"]
else:
print(f"🎞 搜索Emby媒体库{response.text}")
return False

View File

@ -1,6 +1,6 @@
# !/usr/bin/env python3 # !/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Modify: 2024-04-03 # Modify: 2024-11-13
# Repo: https://github.com/Cp0204/quark_auto_save # Repo: https://github.com/Cp0204/quark_auto_save
# ConfigFile: quark_config.json # ConfigFile: quark_config.json
""" """
@ -14,6 +14,7 @@ import json
import time import time
import random import random
import requests import requests
import importlib
from datetime import datetime from datetime import datetime
# 兼容青龙 # 兼容青龙
@ -686,79 +687,19 @@ class Quark:
return is_rename_count > 0 return is_rename_count > 0
class Emby: def load_media_servers(media_servers_config):
def __init__(self, emby_url, emby_apikey): media_servers = {}
self.is_active = False for server_name, server_config in media_servers_config.items():
if emby_url and emby_apikey: try:
self.emby_url = emby_url module = importlib.import_module(f"media_servers.{server_name}")
self.emby_apikey = emby_apikey ServerClass = getattr(module, server_name.capitalize())
if self.get_info(): # 复制配置,避免修改原始配置
self.is_active = True server_args = server_config.copy()
# 动态传递参数
def get_info(self): media_servers[server_name] = ServerClass(**server_args)
url = f"{self.emby_url}/emby/System/Info" except (ImportError, AttributeError):
headers = {"X-Emby-Token": self.emby_apikey} print(f"加载媒体服务器模块 {server_name} 失败。")
querystring = {} return media_servers
response = requests.request("GET", url, headers=headers, params=querystring)
if "application/json" in response.headers["Content-Type"]:
response = response.json()
print(
f"Emby媒体库: {response.get('ServerName','')} v{response.get('Version','')}"
)
return True
else:
print(f"Emby媒体库: 连接失败❌ {response.text}")
return False
def refresh(self, emby_id):
if emby_id:
url = f"{self.emby_url}/emby/Items/{emby_id}/Refresh"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {
"Recursive": "true",
"MetadataRefreshMode": "FullRefresh",
"ImageRefreshMode": "FullRefresh",
"ReplaceAllMetadata": "false",
"ReplaceAllImages": "false",
}
response = requests.request(
"POST", url, headers=headers, params=querystring
)
if response.text == "":
print(f"🎞 刷新Emby媒体库成功✅")
return True
else:
print(f"🎞 刷新Emby媒体库{response.text}")
return False
def search(self, media_name):
if media_name:
url = f"{self.emby_url}/emby/Items"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {
"IncludeItemTypes": "Series",
"StartIndex": 0,
"SortBy": "SortName",
"SortOrder": "Ascending",
"ImageTypeLimit": 0,
"Recursive": "true",
"SearchTerm": media_name,
"Limit": 10,
"IncludeSearchTypes": "false",
}
response = requests.request("GET", url, headers=headers, params=querystring)
if "application/json" in response.headers["Content-Type"]:
response = response.json()
if response.get("Items"):
for item in response["Items"]:
if item["IsFolder"]:
print(
f"🎞 《{item['Name']}》匹配到Emby媒体库ID{item['Id']}"
)
return item["Id"]
else:
print(f"🎞 搜索Emby媒体库{response.text}")
return False
def verify_account(account): def verify_account(account):
@ -818,10 +759,7 @@ def do_sign(account):
def do_save(account, tasklist=[]): def do_save(account, tasklist=[]):
emby = Emby( media_servers = load_media_servers(CONFIG_DATA.get("media_servers", {}))
CONFIG_DATA.get("emby", {}).get("url", ""),
CONFIG_DATA.get("emby", {}).get("apikey", ""),
)
print(f"转存账号: {account.nickname}") print(f"转存账号: {account.nickname}")
# 获取全部保存目录fid # 获取全部保存目录fid
account.update_savepath_fid(tasklist) account.update_savepath_fid(tasklist)
@ -862,17 +800,33 @@ def do_save(account, tasklist=[]):
is_new = account.do_save_task(task) is_new = account.do_save_task(task)
is_rename = account.do_rename_task(task) is_rename = account.do_rename_task(task)
# 刷新媒体库 # 刷新媒体库
if emby.is_active and (is_new or is_rename) and task.get("emby_id") != "0": for server_name, media_server in media_servers.items():
if task.get("emby_id"): if (
emby.refresh(task["emby_id"]) media_server.is_active
else: and (is_new or is_rename)
match_emby_id = emby.search(task["taskname"]) and task.get("media_id") != "0"
if match_emby_id: ):
task["emby_id"] = match_emby_id if task.get("media_id"):
emby.refresh(match_emby_id) media_server.refresh(task["media_id"])
else:
match_media_id = media_server.search(task["taskname"])
if match_media_id:
task["media_id"] = match_media_id
media_server.refresh(match_media_id)
print() print()
def reaking_change_update():
global CONFIG_DATA
# print("Update config v0.3.6.1 to 0.3.7")
if CONFIG_DATA.get("emby"):
CONFIG_DATA.setdefault("media_servers", {})["emby"] = CONFIG_DATA["emby"]
del CONFIG_DATA["emby"]
for task in CONFIG_DATA.get("tasklist", {}):
task["media_id"] = task.get("emby_id", "")
del task["emby_id"]
def main(): def main():
global CONFIG_DATA global CONFIG_DATA
start_time = datetime.now() start_time = datetime.now()
@ -900,6 +854,7 @@ def main():
print(f"⚙️ 正从 {config_path} 文件中读取配置") print(f"⚙️ 正从 {config_path} 文件中读取配置")
with open(config_path, "r", encoding="utf-8") as file: with open(config_path, "r", encoding="utf-8") as file:
CONFIG_DATA = json.load(file) CONFIG_DATA = json.load(file)
reaking_change_update()
cookie_val = CONFIG_DATA.get("cookie") cookie_val = CONFIG_DATA.get("cookie")
if not CONFIG_DATA.get("magic_regex"): if not CONFIG_DATA.get("magic_regex"):
CONFIG_DATA["magic_regex"] = MAGIC_REGEX CONFIG_DATA["magic_regex"] = MAGIC_REGEX