mirror of
https://github.com/Cp0204/quark-auto-save.git
synced 2026-01-13 15:50:45 +08:00
99 lines
3.4 KiB
Python
99 lines
3.4 KiB
Python
import os
|
||
import requests
|
||
|
||
|
||
class Plex:
|
||
default_config = {
|
||
"url": "", # Plex服务器URL
|
||
"token": "", # Plex Token
|
||
"base_path": "" # Plex媒体库基础路径
|
||
}
|
||
is_active = False
|
||
|
||
def __init__(self, **kwargs):
|
||
if kwargs:
|
||
for key, value in self.default_config.items():
|
||
if key in kwargs:
|
||
setattr(self, key, kwargs[key])
|
||
else:
|
||
print(f"{self.__class__.__name__} 模块缺少必要参数: {key}")
|
||
if self.url and self.token and self.base_path:
|
||
if self.get_info():
|
||
self.is_active = True
|
||
|
||
def run(self, task):
|
||
if task.get("savepath"):
|
||
# 拼接完整路径
|
||
full_path = os.path.join(self.base_path, task["savepath"].lstrip("/"))
|
||
full_path = full_path.replace("\\", "/")
|
||
self.refresh(full_path)
|
||
return task
|
||
|
||
def get_info(self):
|
||
"""获取Plex服务器信息"""
|
||
headers = {
|
||
'Accept': 'application/json',
|
||
'X-Plex-Token': self.token
|
||
}
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.url}/",
|
||
headers=headers
|
||
)
|
||
if response.status_code == 200:
|
||
info = response.json()['MediaContainer']
|
||
print(f"Plex媒体库: {info.get('friendlyName','')} v{info.get('version','')}")
|
||
return True
|
||
else:
|
||
print(f"Plex媒体库: 连接失败❌ 状态码:{response.status_code}")
|
||
except Exception as e:
|
||
print(f"获取Plex媒体库信息出错: {e}")
|
||
return False
|
||
|
||
def refresh(self, folder_path):
|
||
"""刷新指定文件夹"""
|
||
if not folder_path:
|
||
return False
|
||
|
||
headers = {
|
||
'Accept': 'application/json',
|
||
'X-Plex-Token': self.token
|
||
}
|
||
|
||
try:
|
||
response = requests.get(
|
||
f"{self.url}/library/sections",
|
||
headers=headers
|
||
)
|
||
|
||
if response.status_code != 200:
|
||
print(f"🎞️ 刷新Plex媒体库:获取库信息失败❌ 状态码:{response.status_code}")
|
||
return False
|
||
|
||
libraries = response.json()['MediaContainer']['Directory']
|
||
|
||
for library in libraries:
|
||
for location in library.get('Location', []):
|
||
if folder_path.startswith(location['path']):
|
||
library_id = library['key']
|
||
refresh_url = (
|
||
f"{self.url}/library/sections/{library_id}/refresh"
|
||
f"?path={folder_path}"
|
||
)
|
||
refresh_response = requests.get(refresh_url, headers=headers)
|
||
|
||
if refresh_response.status_code == 200:
|
||
print(f"🎞️ 刷新Plex媒体库:成功✅")
|
||
print(f"📁 扫描路径: {folder_path}")
|
||
return True
|
||
else:
|
||
print(f"🎞️ 刷新Plex媒体库:刷新请求失败❌ 状态码:{refresh_response.status_code}")
|
||
return False
|
||
|
||
print(f"🎞️ 刷新Plex媒体库:未找到匹配的媒体库❌ {folder_path}")
|
||
|
||
except Exception as e:
|
||
print(f"刷新Plex媒体库出错: {e}")
|
||
|
||
return False |