From 6fba6eefa9764286630c79945931c0561f3d2d43 Mon Sep 17 00:00:00 2001 From: Cp0204 Date: Sat, 16 Nov 2024 14:27:27 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=EF=B8=8F=20=E4=BC=98=E5=8C=96Plex?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E5=A4=84=E7=90=86=E5=8F=8A=E5=BA=93=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E7=BC=93=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- media_servers/plex.py | 99 +++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 51 deletions(-) diff --git a/media_servers/plex.py b/media_servers/plex.py index 158c14a..a0da69f 100644 --- a/media_servers/plex.py +++ b/media_servers/plex.py @@ -3,12 +3,14 @@ import requests class Plex: + default_config = { "url": "", # Plex服务器URL - "token": "", # Plex Token - "base_path": "" # Plex媒体库基础路径 + "token": "", # Plex Token,可F12在请求中抓取 + "quark_root_path": "", # 夸克根目录在Plex中的路径;假设夸克目录/media/tv在plex中对应的路径为/quark/media/tv,则为/quark } is_active = False + _libraries = None # 缓存库信息 def __init__(self, **kwargs): if kwargs: @@ -17,33 +19,31 @@ class Plex: setattr(self, key, kwargs[key]) else: print(f"{self.__class__.__name__} 模块缺少必要参数: {key}") - if self.url and self.token and self.base_path: + if self.url and self.token and self.quark_root_path: if self.get_info(): self.is_active = True def run(self, task): if task.get("savepath"): + # 检查是否已缓存库信息 + if self._libraries is None: + self._libraries = self._get_libraries() # 拼接完整路径 - full_path = os.path.join(self.base_path, task["savepath"].lstrip("/")) - full_path = full_path.replace("\\", "/") + full_path = os.path.normpath( + os.path.join(self.quark_root_path, task["savepath"].lstrip("/")) + ).replace("\\", "/") self.refresh(full_path) - return task def get_info(self): """获取Plex服务器信息""" - headers = { - 'Accept': 'application/json', - 'X-Plex-Token': self.token - } - + headers = {"Accept": "application/json", "X-Plex-Token": self.token} try: - response = requests.get( - f"{self.url}/", - headers=headers - ) + response = requests.get(f"{self.url}/", headers=headers) if response.status_code == 200: - info = response.json()['MediaContainer'] - print(f"Plex媒体库: {info.get('friendlyName','')} v{info.get('version','')}") + info = response.json()["MediaContainer"] + print( + f"Plex媒体库: {info.get('friendlyName','')} v{info.get('version','')}" + ) return True else: print(f"Plex媒体库: 连接失败❌ 状态码:{response.status_code}") @@ -55,45 +55,42 @@ class Plex: """刷新指定文件夹""" if not folder_path: return False - - headers = { - 'Accept': 'application/json', - 'X-Plex-Token': self.token - } - + headers = {"Accept": "application/json", "X-Plex-Token": self.token} try: - response = requests.get( - f"{self.url}/library/sections", - headers=headers - ) - - if response.status_code != 200: - print(f"🎞️ 刷新Plex媒体库:获取库信息失败❌ 状态码:{response.status_code}") - return False - - libraries = response.json()['MediaContainer']['Directory'] - - for library in libraries: - for location in library.get('Location', []): - if folder_path.startswith(location['path']): - library_id = library['key'] - refresh_url = ( - f"{self.url}/library/sections/{library_id}/refresh" - f"?path={folder_path}" - ) + for library in self._libraries: + for location in library.get("Location", []): + if ( + os.path.commonpath([folder_path, location["path"]]) + == location["path"] + ): + refresh_url = f"{self.url}/library/sections/{library['key']}/refresh?path={folder_path}" refresh_response = requests.get(refresh_url, headers=headers) - if refresh_response.status_code == 200: - print(f"🎞️ 刷新Plex媒体库:成功✅") - print(f"📁 扫描路径: {folder_path}") + print( + f"🎞️ 刷新Plex媒体库:{library['title']} [{folder_path}] 成功✅" + ) return True else: - print(f"🎞️ 刷新Plex媒体库:刷新请求失败❌ 状态码:{refresh_response.status_code}") - return False - - print(f"🎞️ 刷新Plex媒体库:未找到匹配的媒体库❌ {folder_path}") - + print( + f"🎞️ 刷新Plex媒体库:刷新请求失败❌ 状态码:{refresh_response.status_code}" + ) + print(f"🎞️ 刷新Plex媒体库:{folder_path} 未找到匹配的媒体库❌") except Exception as e: print(f"刷新Plex媒体库出错: {e}") + return False - return False \ No newline at end of file + def _get_libraries(self): + """获取Plex媒体库信息""" + url = f"{self.url}/library/sections" + headers = {"Accept": "application/json", "X-Plex-Token": self.token} + try: + response = requests.get(url, headers=headers) + if response.status_code == 200: + libraries = response.json()["MediaContainer"].get("Directory", []) + return libraries + else: + print(f"🎞️ 获取Plex媒体库信息失败❌ 状态码:{response.status_code}") + return None + except Exception as e: + print(f"获取Plex媒体库信息出错: {e}") + return None