Compare commits

...

26 Commits

Author SHA1 Message Date
x1ao4
efc2be761f
Merge pull request #57 from x1ao4/dev
新增追剧日历、任务匹配、视图切换等功能及其他优化
2025-09-15 16:06:39 +08:00
x1ao4
9fd9d5cd74 在编辑元数据模态框内增加了更换海报功能 2025-09-15 15:49:17 +08:00
x1ao4
02d8f60709 在集数统计时增加对 “日期 连接符 第x期” 格式的特殊处理
- 修改前端 buildProgressByTaskNameFromLatestFiles 函数,优先检测"日期 连接符 第x期"格式
- 修改后端 extract_progress_from_latest_file 函数,支持相同的格式检测
- 支持多种日期格式:2025-09-08、2025/09/08、2025.09.08
- 支持多种连接符号:空格、连字符、下划线、点号、斜杠
- 当同时存在日期和"第x期"时,优先使用日期进行集数统计
- 保持向后兼容,不影响现有的纯集数和纯日期格式处理

问题:最近转存文件中的"2025-09-08 - 第128期"等格式被错误地使用第128期而不是日期
解决:增加特殊格式检测,当同时存在日期和"第x期"时优先使用日期,支持各种连接符号
2025-09-15 03:07:45 +08:00
x1ao4
aabc5d9afd 修复资源状态显示网络连接异常的问题
- 修改 formatShareUrlBanMessage 函数,对可恢复错误返回 null 而不是格式化消息
- 修改所有设置 shareurl_ban 的地方,只有非 null 的格式化结果才设置警告
- 修改显示逻辑,确保能正确显示错误信息
- 解决临时网络错误被误判为永久失效的问题

问题:可以正常访问的资源被显示为"网络连接异常,请稍后重试"
解决:区分临时网络错误和永久错误,只有永久错误才设置 shareurl_ban 警告
2025-09-15 02:40:40 +08:00
x1ao4
89aff60551 修复任务列表排序导致的操作绑定错误问题
- 修复任务配置区域按钮使用正确的原始索引而不是排序后索引
- 修复选择文件夹、重置文件夹、选择起始文件等按钮的操作绑定
- 修复资源搜索、命名预览、日期选择等功能的索引绑定
- 确保无论使用何种排序方式,按钮操作的都是正确的任务

问题:任务列表使用非编号排序时,配置区域的按钮操作的是按编号排序时对应位置的任务
解决:使用 task.__originalIndex 属性确保所有按钮操作都使用正确的原始索引
2025-09-15 02:20:44 +08:00
x1ao4
e1a3a74752 修复任务列表列表视图下的集数统计问题
- 修复 getTaskSeasonCounts 函数,使其支持 progressByTaskName 映射
- 支持基于日期的集数统计,与海报视图保持一致
- 修复排序逻辑中的集数计算问题
- 确保列表视图和海报视图的集数统计数据一致

问题:任务列表列表视图下没有正确获取基于日期的集数统计,排序时也被视作0
解决:让 getTaskSeasonCounts 函数优先使用 progressByTaskName 映射,支持基于日期的集数查找
2025-09-15 02:00:19 +08:00
x1ao4
9e717f1b69 修复任务列表排序导致的新增任务定位问题
- 修复 addTask 函数中的任务定位逻辑,使用临时标识来准确定位新任务
- 解决非编号排序时新增任务后展开错误编辑模块的问题
- 确保无论使用何种排序方式,新增任务后都能正确定位到新任务的配置区域
- 保持向后兼容,不影响现有排序和编辑功能

问题:任务列表使用非编号排序时,点击添加任务后展开的是排在最后的任务而不是新添加的任务
解决:通过临时标识和排序后列表查找,确保始终展开新添加的任务
2025-09-15 01:38:17 +08:00
x1ao4
41b2cd7727 修复最近转存文件只包含日期信息时集数统计和转存进度丢失的问题
- 修复后端 enrich_tasks_with_calendar_meta 函数,增加对只有日期情况的处理
- 修复前端任务列表页面数据加载,确保 episodes 数据和进度映射正确构建
- 解决刷新页面后已转存集数变为0的问题
- 保持向后兼容,不影响包含集数的文件处理

问题:最近转存文件中不包含集数但包含播出日期时,任务列表页面的集数统计和转存进度在刷新后会丢失
解决:完善后端数据处理逻辑,确保前端页面能正确加载和构建所需的数据映射
2025-09-15 01:25:39 +08:00
x1ao4
7c1d0f9fc1
更新功能说明 2025-09-15 00:35:26 +08:00
x1ao4
d685ab4e90 更新 Wiki 链接地址 2025-09-15 00:08:33 +08:00
x1ao4
922c47848b 新增在任务列表的海报视图下点击任务名称打开 TMDB 季页面的功能
- 为任务列表海报视图中的任务名称添加点击事件处理器
- 新增 getTaskTmdbId() 方法,智能获取任务的 TMDB ID
- 新增 getTaskSeasonNumber() 方法,获取任务的季数信息
- 新增 openTaskTmdbPage() 方法,优先打开特定季的 TMDB 页面
- 只有当任务有匹配项目时才显示为可点击状态
- 当有季数信息时打开 /tv/{tmdb_id}/season/{season_number} 页面
- 当无季数信息时回退到打开整个节目页面
- 提升了用户访问 TMDB 页面的便捷性和精确性
2025-09-14 18:36:25 +08:00
x1ao4
46b07fcdac 优化任务配置继承逻辑,支持按视图类型继承对应任务配置
- 新增 getLastTaskByCurrentFilter() 方法,根据当前视图筛选条件获取对应类型中编号最大的任务
- 修改 addTask() 方法,使用新的筛选逻辑继承任务配置
- 修改 openCreateTaskModal() 方法,使用新的筛选逻辑继承任务配置
- 现在在剧集视图下新建任务会继承剧集类型中编号最大的任务配置,动画视图下会继承动画类型中编号最大的任务配置,以此类推
- 提升了用户在特定类型视图下新建同类任务时的操作体验
2025-09-14 18:22:05 +08:00
x1ao4
454abc0f3a 优化模态框的面包屑导航样式,支持滚动查看超长目录 2025-09-14 17:51:04 +08:00
x1ao4
fc6ecaa534 优化文件整理页面的面包屑导航样式,支持滚动查看超长目录 2025-09-14 17:21:40 +08:00
x1ao4
9f4aa83e22 新增资源搜索结果连续浏览功能
- 在资源搜索的选择需转存的文件夹模态框中添加上一个/下一个导航按钮
- 支持在搜索结果中连续浏览,无需关闭模态框重新选择
- 在左下角显示当前资源序号信息(第 X 个资源)
2025-09-14 03:55:14 +08:00
x1ao4
40fd3738f7 为任务列表的海报视图补全任务功能按钮 2025-09-14 01:36:19 +08:00
x1ao4
8f0c60957d 新增海报语言设置功能 2025-09-13 20:26:27 +08:00
x1ao4
2db4e1587d 在任务列表页面新增创建任务按钮,优化热更新和季数匹配逻辑 2025-09-13 19:08:44 +08:00
x1ao4
a51cd1251a 为任务列表新增了海报视图,可切换显示模式 2025-09-12 02:57:23 +08:00
x1ao4
a2af2dcbe0 优化追剧日历海报悬停信息的展示效果 2025-09-11 23:28:27 +08:00
x1ao4
a917123463 为系统配置的显示设置部分更新了悬停帮助信息 2025-09-11 01:59:57 +08:00
x1ao4
0d1cca0a39 修复任务信息显示状态与设置不符的问题 2025-09-10 03:13:05 +08:00
x1ao4
d8749ff69f 在任务列表页面新增排序功能 2025-09-10 02:49:45 +08:00
x1ao4
591c9e9fe1 在任务列表页面新增按任务类型筛选功能 2025-09-09 17:00:15 +08:00
x1ao4
7f42f694ce 新增追剧日历功能 2025-09-09 16:24:58 +08:00
x1ao4
5170760331 修复 inner error 误判问题
- 添加对 "inner error" 类型错误的识别和处理
- 将 "inner error" 归类为可恢复的网络错误而非资源失效
- 前端多个方法中添加对 "inner error" 的识别和忽略
- 后端添加 is_recoverable_error 工具方法
- 后端 _send_request 方法中自动转换 "inner error" 为 "request error"
- 后端 get_detail 方法中添加重试机制处理临时网络问题
- 避免因临时网络问题误判资源状态,提升用户体验
2025-08-31 12:23:52 +08:00
8 changed files with 10252 additions and 258 deletions

View File

@ -14,6 +14,11 @@
- **文件整理**:支持浏览和管理多个夸克账号的网盘文件,支持单项/批量重命名(支持应用完整的命名、过滤规则和撤销重命名等操作)、移动文件、删除文件、新建文件夹等操作。
- **更新状态**:支持在任务列表页面显示任务的最近更新日期、最近转存文件,支持在任务列表、转存记录、文件整理页面显示当日更新标识(对于当日更新的内容)。
- **影视发现**:支持在影视发现页面浏览豆瓣热门影视榜单,一键快速创建任务,智能填充任务配置,实现便捷订阅。
- **任务筛选**:支持在任务列表页面按照任务类型(如剧集、动画、综艺等)筛选任务,在对应的筛选视图下创建任务将会自动继承同类任务的基础配置,便于自动填充任务配置。
- **任务排序**:支持在任务列表页面按照任务编号、任务名称、任务进度和更新时间进行排序,支持升序和降序排列。
- **任务视图**:支持在任务列表页面使用列表视图和海报视图两种视图浏览和管理任务,海报视图将通过 TMDB 与任务对应的电视节目进行匹配,并显示对应的节目海报。
- **任务匹配**:在配置了 TMDB API 密钥后,所有任务将自动通过 TMDB 进行元数据匹配,匹配后将获取电视节目的海报和元数据,丰富任务相关信息的展示,包括集数信息统计、当前任务进度和电视节目状态等。
- **追剧日历**:支持在追剧日历页面查看和浏览任务对应电视节目的信息和播出时间表,追踪电视节目的播出情况,了解任务的完成进度。追剧日历支持海报视图和日历视图两种视图,可方便快捷的了解订阅内容的实时状态。
本项目修改后的版本为个人需求定制版,目的是满足我自己的使用需求,某些(我不用的)功能可能会因为修改而出现 BUG不一定会被修复。若你要使用本项目请知晓本人不是程序员我无法保证本项目的稳定性如果你在使用过程中发现了 BUG可以在 Issues 中提交,但不保证每个 BUG 都能被修复,请谨慎使用,风险自担。
@ -48,7 +53,7 @@
- [x] 支持多组任务
- [x] 任务结束期限,期限后不执行此任务
- [x] 可单独指定子任务星期几执行
- [x] **支持通过任务名称跳转 TMDB、豆瓣相关(搜索)页面**
- [x] **支持通过任务名称跳转 TMDB、豆瓣相关页面**
- [x] **支持通过影视发现页面浏览豆瓣热门影视榜单、快速创建任务(支持智能填充任务配置)**
- 媒体库整合
@ -61,6 +66,7 @@
- [x] 支持多个通知推送渠道 <sup>[?](https://github.com/x1ao4/quark-auto-save-x/wiki/通知推送服务配置)</sup>
- [x] 支持多账号(多账号签到、**文件管理**,仅首账号转存)
- [x] 支持网盘文件下载、strm 文件生成等功能 <sup>[?](https://github.com/x1ao4/quark-auto-save-x/wiki/插件配置)</sup>
- [x] **支持通过追剧日历功能了解订阅内容的播出情况**
## 部署
### Docker 部署

2848
app/run.py

File diff suppressed because it is too large Load Diff

View File

@ -268,4 +268,327 @@ class RecordDB:
if records:
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in records]
return []
return []
class CalendarDB:
"""追剧日历本地缓存数据库:剧、季、集信息本地化存储"""
def __init__(self, db_path="config/data.db"):
self.db_path = db_path
self.conn = None
self.init_db()
def init_db(self):
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
cursor = self.conn.cursor()
# shows
cursor.execute('''
CREATE TABLE IF NOT EXISTS shows (
tmdb_id INTEGER PRIMARY KEY,
name TEXT,
year TEXT,
status TEXT,
poster_local_path TEXT,
latest_season_number INTEGER,
last_refreshed_at INTEGER,
bound_task_names TEXT,
content_type TEXT
)
''')
# 检查 content_type 字段是否存在,如果不存在则添加
cursor.execute("PRAGMA table_info(shows)")
columns = [column[1] for column in cursor.fetchall()]
if 'content_type' not in columns:
cursor.execute('ALTER TABLE shows ADD COLUMN content_type TEXT')
# seasons
cursor.execute('''
CREATE TABLE IF NOT EXISTS seasons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER,
season_number INTEGER,
season_name TEXT,
episode_count INTEGER,
refresh_url TEXT,
UNIQUE (tmdb_id, season_number)
)
''')
# episodes
cursor.execute('''
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER,
season_number INTEGER,
episode_number INTEGER,
name TEXT,
overview TEXT,
air_date TEXT,
runtime INTEGER,
type TEXT,
updated_at INTEGER,
UNIQUE (tmdb_id, season_number, episode_number)
)
''')
self.conn.commit()
def close(self):
if self.conn:
self.conn.close()
# shows
def upsert_show(self, tmdb_id:int, name:str, year:str, status:str, poster_local_path:str, latest_season_number:int, last_refreshed_at:int=0, bound_task_names:str="", content_type:str=""):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO shows (tmdb_id, name, year, status, poster_local_path, latest_season_number, last_refreshed_at, bound_task_names, content_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id) DO UPDATE SET
name=excluded.name,
year=excluded.year,
status=excluded.status,
poster_local_path=excluded.poster_local_path,
latest_season_number=excluded.latest_season_number,
last_refreshed_at=excluded.last_refreshed_at,
bound_task_names=excluded.bound_task_names,
content_type=excluded.content_type
''', (tmdb_id, name, year, status, poster_local_path, latest_season_number, last_refreshed_at, bound_task_names, content_type))
self.conn.commit()
def bind_task_to_show(self, tmdb_id:int, task_name:str):
"""绑定任务到节目,在 bound_task_names 字段中记录任务名"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return False
current_bound_tasks = row[0] or ""
task_list = current_bound_tasks.split(',') if current_bound_tasks else []
# 如果任务名不在绑定列表中,则添加
if task_name not in task_list:
task_list.append(task_name)
new_bound_tasks = ','.join(task_list)
cursor.execute('UPDATE shows SET bound_task_names=? WHERE tmdb_id=?', (new_bound_tasks, tmdb_id))
self.conn.commit()
return True
return False
def unbind_task_from_show(self, tmdb_id:int, task_name:str):
"""从节目解绑任务,更新 bound_task_names 列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return False
current_bound_tasks = row[0] or ""
task_list = [t for t in (current_bound_tasks.split(',') if current_bound_tasks else []) if t]
if task_name in task_list:
task_list.remove(task_name)
new_bound_tasks = ','.join(task_list)
cursor.execute('UPDATE shows SET bound_task_names=? WHERE tmdb_id=?', (new_bound_tasks, tmdb_id))
self.conn.commit()
return True
return False
def get_bound_tasks_for_show(self, tmdb_id:int):
"""获取绑定到指定节目的任务列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row or not row[0]:
return []
return row[0].split(',')
def get_show_by_task_name(self, task_name:str):
"""根据任务名查找绑定的节目"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE bound_task_names LIKE ?', (f'%{task_name}%',))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def get_show(self, tmdb_id:int):
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def delete_show(self, tmdb_id:int):
cursor = self.conn.cursor()
cursor.execute('DELETE FROM episodes WHERE tmdb_id=?', (tmdb_id,))
cursor.execute('DELETE FROM seasons WHERE tmdb_id=?', (tmdb_id,))
cursor.execute('DELETE FROM shows WHERE tmdb_id=?', (tmdb_id,))
self.conn.commit()
# seasons
def upsert_season(self, tmdb_id:int, season_number:int, episode_count:int, refresh_url:str, season_name:str=""):
cursor = self.conn.cursor()
# 迁移:如缺少 season_name 字段则补充
try:
cursor.execute("PRAGMA table_info(seasons)")
columns = [column[1] for column in cursor.fetchall()]
if 'season_name' not in columns:
cursor.execute('ALTER TABLE seasons ADD COLUMN season_name TEXT')
except Exception:
pass
cursor.execute('''
INSERT INTO seasons (tmdb_id, season_number, season_name, episode_count, refresh_url)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id, season_number) DO UPDATE SET
season_name=excluded.season_name,
episode_count=excluded.episode_count,
refresh_url=excluded.refresh_url
''', (tmdb_id, season_number, season_name, episode_count, refresh_url))
self.conn.commit()
def get_season(self, tmdb_id:int, season_number:int):
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM seasons WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
# episodes
def upsert_episode(self, tmdb_id:int, season_number:int, episode_number:int, name:str, overview:str, air_date:str, runtime, ep_type:str, updated_at:int):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO episodes (tmdb_id, season_number, episode_number, name, overview, air_date, runtime, type, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id, season_number, episode_number) DO UPDATE SET
name=excluded.name,
overview=excluded.overview,
air_date=excluded.air_date,
runtime=COALESCE(episodes.runtime, excluded.runtime),
type=COALESCE(excluded.type, episodes.type),
updated_at=excluded.updated_at
''', (tmdb_id, season_number, episode_number, name, overview, air_date, runtime, ep_type, updated_at))
self.conn.commit()
def list_latest_season_episodes(self, tmdb_id:int, latest_season:int):
cursor = self.conn.cursor()
cursor.execute('''
SELECT episode_number, name, overview, air_date, runtime, type
FROM episodes
WHERE tmdb_id=? AND season_number=?
ORDER BY episode_number ASC
''', (tmdb_id, latest_season))
rows = cursor.fetchall()
return [
{
'episode_number': r[0],
'name': r[1],
'overview': r[2],
'air_date': r[3],
'runtime': r[4],
'type': r[5],
} for r in rows
]
def list_all_latest_episodes(self):
"""返回所有已知剧目的最新季的所有集(扁平列表,供前端汇总显示)"""
cursor = self.conn.cursor()
cursor.execute('SELECT tmdb_id, name, year, status, poster_local_path, latest_season_number FROM shows')
shows = cursor.fetchall()
result = []
for tmdb_id, name, year, status, poster_local_path, latest_season in shows:
eps = self.list_latest_season_episodes(tmdb_id, latest_season)
for e in eps:
item = {
'tmdb_id': tmdb_id,
'show_name': name,
'year': year,
'status': status,
'poster_local_path': poster_local_path,
'season_number': latest_season,
**e,
}
result.append(item)
return result
# 内容类型管理方法
def update_show_content_type(self, tmdb_id:int, content_type:str):
"""更新节目的内容类型"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET content_type=? WHERE tmdb_id=?', (content_type, tmdb_id))
self.conn.commit()
return cursor.rowcount > 0
def get_show_content_type(self, tmdb_id:int):
"""获取节目的内容类型"""
cursor = self.conn.cursor()
cursor.execute('SELECT content_type FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
return row[0] if row else None
def get_shows_by_content_type(self, content_type:str):
"""根据内容类型获取节目列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE content_type=? ORDER BY name', (content_type,))
rows = cursor.fetchall()
if not rows:
return []
columns = [c[0] for c in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_all_content_types(self):
"""获取所有已使用的内容类型"""
cursor = self.conn.cursor()
cursor.execute('SELECT DISTINCT content_type FROM shows WHERE content_type IS NOT NULL AND content_type != "" ORDER BY content_type')
rows = cursor.fetchall()
return [row[0] for row in rows]
def bind_task_and_content_type(self, tmdb_id:int, task_name:str, content_type:str):
"""绑定任务到节目并设置内容类型"""
# 先绑定任务
self.bind_task_to_show(tmdb_id, task_name)
# 再更新内容类型
self.update_show_content_type(tmdb_id, content_type)
# --------- 扩展:管理季与集清理/更新工具方法 ---------
def purge_other_seasons(self, tmdb_id: int, keep_season_number: int):
"""清除除指定季之外的所有季与对应集数据"""
cursor = self.conn.cursor()
# 删除其他季的 episodes
cursor.execute('DELETE FROM episodes WHERE tmdb_id=? AND season_number != ?', (tmdb_id, keep_season_number))
# 删除其他季的 seasons 行
cursor.execute('DELETE FROM seasons WHERE tmdb_id=? AND season_number != ?', (tmdb_id, keep_season_number))
self.conn.commit()
def delete_season(self, tmdb_id: int, season_number: int):
"""删除指定季及其所有集数据"""
cursor = self.conn.cursor()
cursor.execute('DELETE FROM episodes WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
cursor.execute('DELETE FROM seasons WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
self.conn.commit()
def update_show_latest_season_number(self, tmdb_id: int, latest_season_number: int):
"""更新 shows.latest_season_number"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET latest_season_number=? WHERE tmdb_id=?', (latest_season_number, tmdb_id))
self.conn.commit()
def update_show_poster(self, tmdb_id: int, poster_local_path: str):
"""更新节目的海报路径"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET poster_local_path=? WHERE tmdb_id=?', (poster_local_path, tmdb_id))
self.conn.commit()
def get_all_shows(self):
"""获取所有节目"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows')
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows]

487
app/sdk/tmdb_service.py Normal file
View File

@ -0,0 +1,487 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TMDB服务模块
用于获取电视节目信息和播出时间表
"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class TMDBService:
def __init__(self, api_key: str = None, poster_language: str = "zh-CN"):
self.api_key = api_key
# 首选改为 api.tmdb.org备选为 api.themoviedb.org
self.primary_url = "https://api.tmdb.org/3"
self.backup_url = "https://api.themoviedb.org/3"
self.current_url = self.primary_url
self.language = "zh-CN" # 返回中文数据
self.poster_language = poster_language # 海报语言设置
# 复用会话,开启重试
self.session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"]) # 简单退避
adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=50)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 简单内存缓存,避免短时间重复请求
self._cache = {}
self._cache_ttl_seconds = 600
def is_configured(self) -> bool:
"""检查TMDB API是否已配置"""
return bool(self.api_key and self.api_key.strip())
def reset_to_primary_url(self):
"""重置到主API地址"""
self.current_url = self.primary_url
logger.info("TMDB API地址已重置为主地址")
def get_current_api_url(self) -> str:
"""获取当前使用的API地址"""
return self.current_url
def is_using_backup_url(self) -> bool:
"""检查是否正在使用备用地址"""
return self.current_url == self.backup_url
def _make_request(self, endpoint: str, params: Dict = None) -> Optional[Dict]:
"""发送API请求支持自动切换备用地址"""
if not self.is_configured():
return None
if params is None:
params = {}
params.update({
'api_key': self.api_key,
'language': self.language,
'include_adult': False
})
# 简单缓存键
try:
from time import time as _now
cache_key = (endpoint, tuple(sorted((params or {}).items())))
cached = self._cache.get(cache_key)
if cached and (_now() - cached[0]) < self._cache_ttl_seconds:
return cached[1]
except Exception:
pass
# 尝试主地址
try:
url = f"{self.current_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
try:
self._cache[cache_key] = (_now(), data)
except Exception:
pass
return data
except Exception as e:
logger.warning(f"TMDB主地址请求失败: {e}")
# 如果当前使用的是主地址,尝试切换到备用地址
if self.current_url == self.primary_url:
logger.info("尝试切换到TMDB备用地址...")
self.current_url = self.backup_url
try:
url = f"{self.current_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
logger.info("TMDB备用地址连接成功")
data = response.json()
try:
self._cache[cache_key] = (_now(), data)
except Exception:
pass
return data
except Exception as backup_e:
logger.error(f"TMDB备用地址请求也失败: {backup_e}")
# 重置回主地址,下次请求时重新尝试
self.current_url = self.primary_url
return None
else:
# 如果备用地址也失败,重置回主地址
logger.error(f"TMDB备用地址请求失败: {e}")
self.current_url = self.primary_url
return None
def search_tv_show(self, query: str, year: str = None) -> Optional[Dict]:
"""搜索电视剧"""
params = {
'query': query,
'first_air_date_year': year
}
result = self._make_request('/search/tv', params)
if result and result.get('results'):
# 返回第一个匹配结果
return result['results'][0]
return None
def get_tv_show_details(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧详细信息"""
return self._make_request(f'/tv/{tv_id}')
def get_tv_show_alternative_titles(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧的别名信息"""
return self._make_request(f'/tv/{tv_id}/alternative_titles')
def _is_chinese_text(self, text: str) -> bool:
"""检查文本是否包含中文字符"""
if not text:
return False
for char in text:
if '\u4e00' <= char <= '\u9fff': # 中文字符范围
return True
return False
def get_chinese_title_with_fallback(self, tv_id: int, original_title: str = "") -> str:
"""
获取中文标题如果中文标题为空或不是中文则从别名中获取中国地区的别名
Args:
tv_id: TMDB ID
original_title: 原始标题作为最后的备用方案
Returns:
中文标题或备用标题
"""
try:
# 首先获取节目详情,检查是否有中文标题
details = self.get_tv_show_details(tv_id)
if details:
tmdb_name = details.get('name', '').strip()
# 检查TMDB返回的标题是否包含中文字符
if tmdb_name and self._is_chinese_text(tmdb_name):
logger.debug(f"直接获取到中文标题: {tmdb_name} (TMDB ID: {tv_id})")
return tmdb_name
# 如果TMDB返回的标题不是中文尝试从别名中获取中国地区的别名
alternative_titles = self.get_tv_show_alternative_titles(tv_id)
if alternative_titles and alternative_titles.get('results'):
# 查找中国地区的别名
for alt_title in alternative_titles['results']:
if alt_title.get('iso_3166_1') == 'CN':
chinese_alt_title = alt_title.get('title', '').strip()
if chinese_alt_title and self._is_chinese_text(chinese_alt_title):
logger.debug(f"从别名中获取到中文标题: {chinese_alt_title} (TMDB ID: {tv_id})")
return chinese_alt_title
# 如果都没有找到中文标题,返回原始标题
logger.debug(f"未找到中文标题,使用原始标题: {original_title} (TMDB ID: {tv_id})")
return original_title
else:
# 如果无法获取详情,返回原始标题
return original_title
except Exception as e:
logger.warning(f"获取中文标题失败: {e}, 使用原始标题: {original_title}")
return original_title
def get_tv_show_episodes(self, tv_id: int, season_number: int) -> Optional[Dict]:
"""获取指定季的剧集信息"""
return self._make_request(f'/tv/{tv_id}/season/{season_number}')
def get_tv_show_air_dates(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧播出时间信息"""
return self._make_request(f'/tv/{tv_id}/air_dates')
def get_tv_show_episode_air_dates(self, tv_id: int, season_number: int) -> List[Dict]:
"""获取指定季所有剧集的播出时间"""
episodes = self.get_tv_show_episodes(tv_id, season_number)
if not episodes or 'episodes' not in episodes:
return []
episode_list = []
for episode in episodes['episodes']:
if episode.get('air_date'):
episode_list.append({
'episode_number': episode.get('episode_number'),
'air_date': episode.get('air_date'),
'name': episode.get('name'),
'overview': episode.get('overview')
})
return episode_list
# ===== 节目状态中文映射(不含 returning_series 场景判断;该判断在 run.py 本地完成) =====
def map_show_status_cn(self, status: str) -> str:
"""将 TMDB 节目状态映射为中文。未知状态保持原样。"""
try:
if not status:
return ''
key = str(status).strip().lower().replace(' ', '_')
mapping = {
'returning_series': '播出中',
'in_production': '制作中',
'planned': '计划中',
'ended': '已完结',
'canceled': '已取消',
'cancelled': '已取消',
'pilot': '试播集',
'rumored': '待确认',
}
return mapping.get(key, status)
except Exception:
return status
# 注意returning_series 的“播出中/本季终”判断在 run.py 使用本地 seasons/episodes 统计完成
def arabic_to_chinese_numeral(self, number: int) -> str:
"""将阿拉伯数字转换为中文数字用于季数支持到万0 < number < 100000
规则与约定
- 基本单位包含2的特殊口语用法
- 10-19 省略一十10=11=十一
- 的使用 // 位上优先使用如200=两百2000=两千20000=两万十位仍用二十
- 正确处理的读法 101=一百零一1001=一千零一10010=一万零一十
- 超出范围<=0 >=100000时返回原数字字符串
"""
try:
number = int(number)
except Exception:
return str(number)
if number <= 0 or number >= 100000:
return str(number)
digits = ["", "", "", "", "", "", "", "", "", ""]
def convert_0_9999(n: int) -> str:
if n == 0:
return ""
if n < 10:
return digits[n]
if n < 20:
# 十到十九
return "" + (digits[n - 10] if n > 10 else "")
parts = []
thousand = n // 1000
hundred = (n % 1000) // 100
ten = (n % 100) // 10
one = n % 10
# 千位
if thousand:
if thousand == 2:
parts.append("两千")
else:
parts.append(digits[thousand] + "")
# 百位
if hundred:
if thousand and hundred == 0:
# 不会发生hundred 有值才进来
pass
if hundred == 2:
parts.append("两百")
else:
parts.append(digits[hundred] + "")
else:
if thousand and (ten != 0 or one != 0):
parts.append("")
# 十位
if ten:
if ten == 1 and not thousand and not hundred:
# 10-19 形式(已在 n<20 处理)但也考虑 0xx 场景
parts.append("")
else:
parts.append(digits[ten] + "")
else:
if (hundred or thousand) and one != 0:
parts.append("")
# 个位
if one:
parts.append(digits[one])
# 合并并清理多余“零”
result = ''.join(parts)
# 去重连续零
while "零零" in result:
result = result.replace("零零", "")
# 尾部零去掉
if result.endswith(""):
result = result[:-1]
return result
if number < 10000:
return convert_0_9999(number)
# 处理万级number = a * 10000 + b, 1 <= a <= 9, 0 <= b < 10000
wan = number // 10000
rest = number % 10000
# 万位上的 2 使用“两万”,其他使用常规数字 + 万
if wan == 2:
prefix = "两万"
else:
prefix = digits[wan] + ""
if rest == 0:
return prefix
# rest 存在且不足四位时,需要根据是否存在中间的 0 添加“零”
rest_str = convert_0_9999(rest)
# 当 rest < 1000 时,且 rest_str 不以“零”开头,需要补一个“零”
if rest < 1000:
if not rest_str.startswith(""):
return prefix + "" + rest_str
return prefix + rest_str
def process_season_name(self, raw_name: str) -> str:
"""将 TMDB 返回的季名称进行本地化处理:
- 第1季 24 转换为第一季第二十四季
- 其他名称保持原样
"""
try:
if not raw_name:
return raw_name
# 匹配“第 1 季”“第1季”“第 24 季”等
m = re.search(r"\s*(\d+)\s*季", raw_name)
if m:
n = int(m.group(1))
cn = self.arabic_to_chinese_numeral(n)
return re.sub(r"\s*\d+\s*季", f"{cn}", raw_name)
return raw_name
except Exception:
return raw_name
def search_and_get_episodes(self, show_name: str, year: str = None) -> Optional[Dict]:
"""搜索电视剧并获取剧集信息"""
# 搜索电视剧
show = self.search_tv_show(show_name, year)
if not show:
return None
tv_id = show.get('id')
if not tv_id:
return None
# 获取详细信息
details = self.get_tv_show_details(tv_id)
if not details:
return None
# 获取所有季的剧集信息
seasons = details.get('seasons', [])
all_episodes = []
for season in seasons:
season_number = season.get('season_number', 0)
if season_number > 0: # 排除特殊季如第0季
episodes = self.get_tv_show_episode_air_dates(tv_id, season_number)
for episode in episodes:
episode['season_number'] = season_number
episode['show_name'] = show_name
episode['show_id'] = tv_id
all_episodes.append(episode)
return {
'show_info': {
'id': tv_id,
'name': show_name,
'original_name': details.get('original_name'),
'overview': details.get('overview'),
'poster_path': details.get('poster_path'),
'first_air_date': details.get('first_air_date'),
'media_type': details.get('media_type', 'tv')
},
'episodes': all_episodes
}
def get_episodes_by_date_range(self, start_date: str, end_date: str, show_name: str = None) -> List[Dict]:
"""获取指定日期范围内的剧集播出信息"""
if not self.is_configured():
return []
# 这里可以实现更复杂的日期范围查询
# 目前简化实现,返回空列表
# 实际项目中可以通过TMDB的discover API或其他方式实现
return []
def convert_to_beijing_time(self, utc_time_str: str) -> str:
"""将UTC时间转换为北京时间"""
try:
# 解析UTC时间
utc_time = datetime.fromisoformat(utc_time_str.replace('Z', '+00:00'))
# 转换为北京时间UTC+8
beijing_time = utc_time + timedelta(hours=8)
return beijing_time.strftime('%Y-%m-%d')
except Exception as e:
logger.error(f"时间转换失败: {e}")
return utc_time_str
def get_poster_path_with_language(self, tv_id: int) -> str:
"""根据海报语言设置获取海报路径"""
try:
if not self.is_configured():
return ''
# 获取节目详情
details = self.get_tv_show_details(tv_id)
if not details:
return ''
# 如果设置为原始语言,需要获取原始语言代码
if self.poster_language == "original":
original_language = details.get('original_language', 'en')
# 如果原始语言是zh使用zh-CN
if original_language == 'zh':
original_language = 'zh-CN'
# 使用原始语言请求海报
params = {
'api_key': self.api_key,
'language': original_language,
'include_adult': False
}
# 尝试获取原始语言海报
try:
url = f"{self.current_url}/tv/{tv_id}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
poster_path = data.get('poster_path', '')
if poster_path:
return poster_path
except Exception as e:
logger.warning(f"获取原始语言海报失败: {e}")
# 如果设置为中文或原始语言获取失败,尝试中文海报
if self.poster_language == "zh-CN" or self.poster_language == "original":
params = {
'api_key': self.api_key,
'language': 'zh-CN',
'include_adult': False
}
try:
url = f"{self.current_url}/tv/{tv_id}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
poster_path = data.get('poster_path', '')
if poster_path:
return poster_path
except Exception as e:
logger.warning(f"获取中文海报失败: {e}")
# 如果都失败了,返回默认海报路径
return details.get('poster_path', '')
except Exception as e:
logger.error(f"获取海报路径失败: {e}")
return ''

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

333
app/utils/task_extractor.py Normal file
View File

@ -0,0 +1,333 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
任务信息提取模块
用于从任务中提取剧名年份类型和进度信息
"""
import re
import os
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class TaskExtractor:
def __init__(self):
# 剧集编号提取模式
self.episode_patterns = [
r'S(\d{1,2})E(\d{1,3})', # S01E01
r'E(\d{1,3})', # E01
r'第(\d{1,3})集', # 第1集
r'第(\d{1,3})期', # 第1期
r'(\d{1,3})集', # 1集
r'(\d{1,3})期', # 1期
]
# 日期提取模式
self.date_patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})', # 2025-01-01
r'(\d{4})/(\d{1,2})/(\d{1,2})', # 2025/01/01
r'(\d{4})\.(\d{1,2})\.(\d{1,2})', # 2025.01.01
]
def extract_show_info_from_path(self, save_path: str) -> Dict:
"""
从保存路径中提取剧名和年份信息
Args:
save_path: 任务的保存路径
Returns:
包含剧名和年份的字典
"""
if not save_path:
return {'show_name': '', 'year': '', 'type': 'other'}
# 分割路径
path_parts = save_path.split('/')
show_name = ''
year = ''
content_type = 'other'
# 查找包含年份信息的路径部分
for part in path_parts:
# 匹配年份格式:剧名 (年份)、剧名(年份)、剧名(年份)、剧名 年份
year_patterns = [
r'^(.+?)\s*[\(](\d{4})[\)]$', # 剧名 (2025) 或 剧名2025
r'^(.+?)\s*(\d{4})$', # 剧名 2025
r'^(.+?)\s*\((\d{4})\)$', # 剧名(2025)
]
for pattern in year_patterns:
match = re.match(pattern, part.strip())
if match:
show_name = match.group(1).strip()
year = match.group(2)
break
if show_name and year:
break
# 如果没有找到年份信息,尝试从任务名称中提取
if not show_name:
show_name = self.extract_show_name_from_taskname(path_parts[-1] if path_parts else '')
# 判断内容类型
content_type = self.determine_content_type(save_path)
return {
'show_name': show_name,
'year': year,
'type': content_type
}
def extract_show_name_from_taskname(self, task_name: str) -> str:
"""
从任务名称中提取纯剧名去除季信息等
Args:
task_name: 任务名称
Returns:
提取的剧名
"""
if not task_name:
return ''
# 移除季信息
season_patterns = [
r'^(.+?)\s*[Ss](\d{1,2})', # 剧名 S01
r'^(.+?)\s*第(\d{1,2})季', # 剧名 第1季
r'^(.+?)\s*Season\s*(\d{1,2})', # 剧名 Season 1
r'^(.+?)\s*第([一二三四五六七八九十]+)季', # 剧名 第一季
]
for pattern in season_patterns:
match = re.match(pattern, task_name)
if match:
# 提取到季信息前的名称后,进一步清理尾部多余分隔符/空白
name = match.group(1).strip()
# 去除名称结尾处常见分隔符(空格、横杠、下划线、点、破折号、间隔点、顿号、冒号等)
name = re.sub(r"[\s\-_.—·、:]+$", "", name)
return name
# 如果没有季信息,直接返回任务名称
# 未匹配到季信息时,直接清理尾部多余分隔符/空白后返回
cleaned = task_name.strip()
cleaned = re.sub(r"[\s\-_.—·、:]+$", "", cleaned)
return cleaned
def determine_content_type(self, save_path: str) -> str:
"""
根据保存路径判断内容类型
Args:
save_path: 保存路径
Returns:
内容类型tv剧集anime动画variety综艺documentary纪录片other其他
"""
if not save_path:
return 'other'
path_lower = save_path.lower()
# 根据路径关键词判断类型
if any(keyword in path_lower for keyword in ['剧集', '电视剧', '电视', '电视节目', '连续剧', '影集', 'tv', 'drama']):
return 'tv'
elif any(keyword in path_lower for keyword in ['动画', '动漫', '动画片', '卡通片', '卡通', 'anime', 'cartoon']):
return 'anime'
elif any(keyword in path_lower for keyword in ['综艺', '真人秀', '综艺节目', '娱乐节目', 'variety', 'show']):
return 'variety'
elif any(keyword in path_lower for keyword in ['纪录片', '记录片', 'documentary', 'doc']):
return 'documentary'
else:
return 'other'
def extract_progress_from_latest_file(self, latest_file: str) -> Dict:
"""
从最近转存文件中提取进度信息
Args:
latest_file: 最近转存文件信息 S02E24 2025-08-30
Returns:
包含进度信息的字典
"""
if not latest_file:
return {'episode_number': None, 'air_date': None, 'progress_type': 'unknown'}
# 特殊处理:检测"日期 连接符 第x期"格式,优先使用日期
# 支持各种连接符号:空格、-、_、.、/等
date_episode_patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})\s*[-\s_\.\/]\s*第(\d{1,3})期', # 2025-09-08 - 第128期, 2025-09-08 第128期, 2025-09-08_第128期, 2025-09-08.第128期
r'(\d{4})/(\d{1,2})/(\d{1,2})\s*[-\s_\.\/]\s*第(\d{1,3})期', # 2025/09/08 - 第128期, 2025/09/08 第128期, 2025/09/08_第128期, 2025/09/08.第128期
r'(\d{4})\.(\d{1,2})\.(\d{1,2})\s*[-\s_\.\/]\s*第(\d{1,3})期', # 2025.09.08 - 第128期, 2025.09.08 第128期, 2025.09.08_第128期, 2025.09.08.第128期
]
for pattern in date_episode_patterns:
match = re.search(pattern, latest_file)
if match:
year, month, day = match.groups()[:3] # 只取前3个组年月日
date_str = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return {
'episode_number': None,
'season_number': None,
'air_date': date_str,
'progress_type': 'date'
}
# 尝试提取集数信息
for pattern in self.episode_patterns:
match = re.search(pattern, latest_file)
if match:
if 'S' in pattern and 'E' in pattern:
# S01E01 格式
season = match.group(1)
episode = match.group(2)
return {
'episode_number': int(episode),
'season_number': int(season),
'progress_type': 'episode'
}
else:
# E01 或其他格式
episode = match.group(1)
return {
'episode_number': int(episode),
'season_number': None,
'progress_type': 'episode'
}
# 尝试提取日期信息
for pattern in self.date_patterns:
match = re.search(pattern, latest_file)
if match:
year, month, day = match.groups()
date_str = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return {
'episode_number': None,
'season_number': None,
'air_date': date_str,
'progress_type': 'date'
}
return {
'episode_number': None,
'season_number': None,
'air_date': None,
'progress_type': 'unknown'
}
def extract_all_tasks_info(self, tasks: List[Dict], task_latest_files: Dict) -> List[Dict]:
"""
提取所有任务的信息
Args:
tasks: 任务列表
task_latest_files: 任务最近转存文件信息
Returns:
包含所有任务信息的列表
"""
logging.debug("TaskExtractor.extract_all_tasks_info 开始")
logging.debug(f"tasks数量: {len(tasks)}")
logging.debug(f"task_latest_files数量: {len(task_latest_files)}")
tasks_info = []
for i, task in enumerate(tasks):
try:
logging.debug(f"处理第{i+1}个任务: {task.get('taskname', '')}")
task_name = task.get('taskname', '')
save_path = task.get('savepath', '')
latest_file = task_latest_files.get(task_name, '')
logging.debug(f"task_name: {task_name}")
logging.debug(f"save_path: {save_path}")
logging.debug(f"latest_file: {latest_file}")
# 提取基本信息
show_info = self.extract_show_info_from_path(save_path)
logging.debug(f"show_info: {show_info}")
# 提取进度信息
progress_info = self.extract_progress_from_latest_file(latest_file)
logging.debug(f"progress_info: {progress_info}")
# 优先使用任务显式类型(配置或提取出的),否则回退到路径判断
explicit_type = None
try:
explicit_type = (task.get('calendar_info') or {}).get('extracted', {}).get('content_type')
except Exception:
explicit_type = None
if not explicit_type:
explicit_type = task.get('content_type')
final_type = (explicit_type or show_info['type'] or 'other')
# 合并信息
task_info = {
'task_name': task_name,
'save_path': save_path,
'show_name': show_info['show_name'],
'year': show_info['year'],
'content_type': final_type,
'latest_file': latest_file,
'episode_number': progress_info.get('episode_number'),
'season_number': progress_info.get('season_number'),
'air_date': progress_info.get('air_date'),
'progress_type': progress_info.get('progress_type')
}
logging.debug(f"task_info: {task_info}")
tasks_info.append(task_info)
except Exception as e:
logging.debug(f"处理任务 {i+1} 时出错: {e}")
import traceback
traceback.print_exc()
continue
logging.debug(f"TaskExtractor.extract_all_tasks_info 完成,返回任务数量: {len(tasks_info)}")
return tasks_info
def get_content_type_display_name(self, content_type: str) -> str:
"""
获取内容类型的显示名称
Args:
content_type: 内容类型代码
Returns:
显示名称
"""
type_names = {
'tv': '剧集',
'anime': '动画',
'variety': '综艺',
'documentary': '纪录片',
'other': '其他'
}
return type_names.get(content_type, '其他')
def get_content_types_with_content(self, tasks_info: List[Dict]) -> List[str]:
"""
获取有内容的任务类型列表
Args:
tasks_info: 任务信息列表
Returns:
有内容的类型列表
"""
types = set()
for task_info in tasks_info:
if task_info['show_name']: # 只统计有剧名的任务
types.add(task_info['content_type'])
return sorted(list(types))

View File

@ -1124,6 +1124,19 @@ class Quark:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
# 检查响应内容中是否包含inner error将其转换为request error
try:
response_json = response.json()
if isinstance(response_json, dict) and response_json.get("message"):
error_message = response_json.get("message", "")
if "inner error" in error_message.lower():
# 将inner error转换为request error避免误判为资源失效
response_json["message"] = "request error"
response._content = json.dumps(response_json).encode('utf-8')
except:
pass # 如果JSON解析失败保持原响应不变
return response
except Exception as e:
print(f"_send_request error:\n{e}")
@ -1206,9 +1219,38 @@ class Quark:
else:
return False, response["message"]
def get_detail(self, pwd_id, stoken, pdir_fid, _fetch_share=0):
def is_recoverable_error(self, error_message):
"""
检查错误是否为可恢复的错误网络错误服务端临时错误等
Args:
error_message: 错误消息
Returns:
bool: 是否为可恢复错误
"""
if not error_message:
return False
error_message = error_message.lower()
recoverable_errors = [
"inner error",
"request error",
"网络错误",
"服务端错误",
"临时错误",
"timeout",
"connection error",
"server error"
]
return any(error in error_message for error in recoverable_errors)
def get_detail(self, pwd_id, stoken, pdir_fid, _fetch_share=0, max_retries=3):
list_merge = []
page = 1
retry_count = 0
while True:
url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/detail"
querystring = {
@ -1231,6 +1273,26 @@ class Quark:
except Exception:
return {"error": "request error"}
# 检查响应中是否包含inner error或其他可恢复错误
if isinstance(response, dict) and response.get("message"):
error_message = response.get("message", "")
if "inner error" in error_message.lower():
# 对于inner error尝试重试
if retry_count < max_retries:
retry_count += 1
# 静默重试,不输出重试信息,避免日志污染
# 只有在调试模式下才记录详细重试信息
try:
import os
if os.environ.get("DEBUG", "false").lower() == "true":
print(f"[DEBUG] 遇到inner error进行第{retry_count}次重试...")
except:
pass # 如果无法获取DEBUG环境变量静默处理
time.sleep(1) # 等待1秒后重试
continue
else:
return {"error": "request error"} # 重试次数用尽返回request error
# 统一判错:某些情况下返回没有 code 字段
code = response.get("code")
status = response.get("status")
@ -4811,7 +4873,8 @@ def do_save(account, tasklist=[]):
else:
# 添加基本通知
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 修正首次运行时对子目录的处理 - 只有在首次运行且有新增的子目录时才显示子目录内容
if has_update_in_root and has_update_in_subdir and is_first_run and len(new_added_dirs) == 0:
@ -5142,7 +5205,8 @@ def do_save(account, tasklist=[]):
# 添加成功通知 - 修复问题:确保在有文件时添加通知
if display_files:
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 创建episode_pattern函数用于排序
@ -5232,7 +5296,8 @@ def do_save(account, tasklist=[]):
# 添加成功通知
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 打印文件列表
for idx, file_name in enumerate(display_files):