️ 优化Plex路径处理及库信息缓存

This commit is contained in:
Cp0204 2024-11-16 14:27:27 +08:00
parent 22e764e234
commit 6fba6eefa9

View File

@ -3,12 +3,14 @@ import requests
class Plex: class Plex:
default_config = { default_config = {
"url": "", # Plex服务器URL "url": "", # Plex服务器URL
"token": "", # Plex Token "token": "", # Plex Token可F12在请求中抓取
"base_path": "" # Plex媒体库基础路径 "quark_root_path": "", # 夸克根目录在Plex中的路径假设夸克目录/media/tv在plex中对应的路径为/quark/media/tv则为/quark
} }
is_active = False is_active = False
_libraries = None # 缓存库信息
def __init__(self, **kwargs): def __init__(self, **kwargs):
if kwargs: if kwargs:
@ -17,33 +19,31 @@ class Plex:
setattr(self, key, kwargs[key]) setattr(self, key, kwargs[key])
else: else:
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}") print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
if self.url and self.token and self.base_path: if self.url and self.token and self.quark_root_path:
if self.get_info(): if self.get_info():
self.is_active = True self.is_active = True
def run(self, task): def run(self, task):
if task.get("savepath"): if task.get("savepath"):
# 检查是否已缓存库信息
if self._libraries is None:
self._libraries = self._get_libraries()
# 拼接完整路径 # 拼接完整路径
full_path = os.path.join(self.base_path, task["savepath"].lstrip("/")) full_path = os.path.normpath(
full_path = full_path.replace("\\", "/") os.path.join(self.quark_root_path, task["savepath"].lstrip("/"))
).replace("\\", "/")
self.refresh(full_path) self.refresh(full_path)
return task
def get_info(self): def get_info(self):
"""获取Plex服务器信息""" """获取Plex服务器信息"""
headers = { headers = {"Accept": "application/json", "X-Plex-Token": self.token}
'Accept': 'application/json',
'X-Plex-Token': self.token
}
try: try:
response = requests.get( response = requests.get(f"{self.url}/", headers=headers)
f"{self.url}/",
headers=headers
)
if response.status_code == 200: if response.status_code == 200:
info = response.json()['MediaContainer'] info = response.json()["MediaContainer"]
print(f"Plex媒体库: {info.get('friendlyName','')} v{info.get('version','')}") print(
f"Plex媒体库: {info.get('friendlyName','')} v{info.get('version','')}"
)
return True return True
else: else:
print(f"Plex媒体库: 连接失败❌ 状态码:{response.status_code}") print(f"Plex媒体库: 连接失败❌ 状态码:{response.status_code}")
@ -55,45 +55,42 @@ class Plex:
"""刷新指定文件夹""" """刷新指定文件夹"""
if not folder_path: if not folder_path:
return False return False
headers = {"Accept": "application/json", "X-Plex-Token": self.token}
headers = {
'Accept': 'application/json',
'X-Plex-Token': self.token
}
try: try:
response = requests.get( for library in self._libraries:
f"{self.url}/library/sections", for location in library.get("Location", []):
headers=headers if (
) os.path.commonpath([folder_path, location["path"]])
== location["path"]
if response.status_code != 200: ):
print(f"🎞️ 刷新Plex媒体库获取库信息失败❌ 状态码:{response.status_code}") refresh_url = f"{self.url}/library/sections/{library['key']}/refresh?path={folder_path}"
return False
libraries = response.json()['MediaContainer']['Directory']
for library in libraries:
for location in library.get('Location', []):
if folder_path.startswith(location['path']):
library_id = library['key']
refresh_url = (
f"{self.url}/library/sections/{library_id}/refresh"
f"?path={folder_path}"
)
refresh_response = requests.get(refresh_url, headers=headers) refresh_response = requests.get(refresh_url, headers=headers)
if refresh_response.status_code == 200: if refresh_response.status_code == 200:
print(f"🎞️ 刷新Plex媒体库成功✅") print(
print(f"📁 扫描路径: {folder_path}") f"🎞️ 刷新Plex媒体库{library['title']} [{folder_path}] 成功✅"
)
return True return True
else: else:
print(f"🎞️ 刷新Plex媒体库刷新请求失败❌ 状态码:{refresh_response.status_code}") print(
return False f"🎞️ 刷新Plex媒体库刷新请求失败❌ 状态码:{refresh_response.status_code}"
)
print(f"🎞️ 刷新Plex媒体库未找到匹配的媒体库❌ {folder_path}") print(f"🎞️ 刷新Plex媒体库{folder_path} 未找到匹配的媒体库❌")
except Exception as e: except Exception as e:
print(f"刷新Plex媒体库出错: {e}") print(f"刷新Plex媒体库出错: {e}")
return False
return False def _get_libraries(self):
"""获取Plex媒体库信息"""
url = f"{self.url}/library/sections"
headers = {"Accept": "application/json", "X-Plex-Token": self.token}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
libraries = response.json()["MediaContainer"].get("Directory", [])
return libraries
else:
print(f"🎞️ 获取Plex媒体库信息失败❌ 状态码:{response.status_code}")
return None
except Exception as e:
print(f"获取Plex媒体库信息出错: {e}")
return None