Merge pull request #57 from x1ao4/dev

新增追剧日历、任务匹配、视图切换等功能及其他优化
This commit is contained in:
x1ao4 2025-09-15 16:06:39 +08:00 committed by GitHub
commit efc2be761f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 10252 additions and 258 deletions

View File

@ -14,6 +14,11 @@
- **文件整理**:支持浏览和管理多个夸克账号的网盘文件,支持单项/批量重命名(支持应用完整的命名、过滤规则和撤销重命名等操作)、移动文件、删除文件、新建文件夹等操作。
- **更新状态**:支持在任务列表页面显示任务的最近更新日期、最近转存文件,支持在任务列表、转存记录、文件整理页面显示当日更新标识(对于当日更新的内容)。
- **影视发现**:支持在影视发现页面浏览豆瓣热门影视榜单,一键快速创建任务,智能填充任务配置,实现便捷订阅。
- **任务筛选**:支持在任务列表页面按照任务类型(如剧集、动画、综艺等)筛选任务,在对应的筛选视图下创建任务将会自动继承同类任务的基础配置,便于自动填充任务配置。
- **任务排序**:支持在任务列表页面按照任务编号、任务名称、任务进度和更新时间进行排序,支持升序和降序排列。
- **任务视图**:支持在任务列表页面使用列表视图和海报视图两种视图浏览和管理任务,海报视图将通过 TMDB 与任务对应的电视节目进行匹配,并显示对应的节目海报。
- **任务匹配**:在配置了 TMDB API 密钥后,所有任务将自动通过 TMDB 进行元数据匹配,匹配后将获取电视节目的海报和元数据,丰富任务相关信息的展示,包括集数信息统计、当前任务进度和电视节目状态等。
- **追剧日历**:支持在追剧日历页面查看和浏览任务对应电视节目的信息和播出时间表,追踪电视节目的播出情况,了解任务的完成进度。追剧日历支持海报视图和日历视图两种视图,可方便快捷的了解订阅内容的实时状态。
本项目修改后的版本为个人需求定制版,目的是满足我自己的使用需求,某些(我不用的)功能可能会因为修改而出现 BUG不一定会被修复。若你要使用本项目请知晓本人不是程序员我无法保证本项目的稳定性如果你在使用过程中发现了 BUG可以在 Issues 中提交,但不保证每个 BUG 都能被修复,请谨慎使用,风险自担。
@ -48,7 +53,7 @@
- [x] 支持多组任务
- [x] 任务结束期限,期限后不执行此任务
- [x] 可单独指定子任务星期几执行
- [x] **支持通过任务名称跳转 TMDB、豆瓣相关(搜索)页面**
- [x] **支持通过任务名称跳转 TMDB、豆瓣相关页面**
- [x] **支持通过影视发现页面浏览豆瓣热门影视榜单、快速创建任务(支持智能填充任务配置)**
- 媒体库整合
@ -61,6 +66,7 @@
- [x] 支持多个通知推送渠道 <sup>[?](https://github.com/x1ao4/quark-auto-save-x/wiki/通知推送服务配置)</sup>
- [x] 支持多账号(多账号签到、**文件管理**,仅首账号转存)
- [x] 支持网盘文件下载、strm 文件生成等功能 <sup>[?](https://github.com/x1ao4/quark-auto-save-x/wiki/插件配置)</sup>
- [x] **支持通过追剧日历功能了解订阅内容的播出情况**
## 部署
### Docker 部署

2848
app/run.py

File diff suppressed because it is too large Load Diff

View File

@ -268,4 +268,327 @@ class RecordDB:
if records:
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in records]
return []
return []
class CalendarDB:
"""追剧日历本地缓存数据库:剧、季、集信息本地化存储"""
def __init__(self, db_path="config/data.db"):
self.db_path = db_path
self.conn = None
self.init_db()
def init_db(self):
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
cursor = self.conn.cursor()
# shows
cursor.execute('''
CREATE TABLE IF NOT EXISTS shows (
tmdb_id INTEGER PRIMARY KEY,
name TEXT,
year TEXT,
status TEXT,
poster_local_path TEXT,
latest_season_number INTEGER,
last_refreshed_at INTEGER,
bound_task_names TEXT,
content_type TEXT
)
''')
# 检查 content_type 字段是否存在,如果不存在则添加
cursor.execute("PRAGMA table_info(shows)")
columns = [column[1] for column in cursor.fetchall()]
if 'content_type' not in columns:
cursor.execute('ALTER TABLE shows ADD COLUMN content_type TEXT')
# seasons
cursor.execute('''
CREATE TABLE IF NOT EXISTS seasons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER,
season_number INTEGER,
season_name TEXT,
episode_count INTEGER,
refresh_url TEXT,
UNIQUE (tmdb_id, season_number)
)
''')
# episodes
cursor.execute('''
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER,
season_number INTEGER,
episode_number INTEGER,
name TEXT,
overview TEXT,
air_date TEXT,
runtime INTEGER,
type TEXT,
updated_at INTEGER,
UNIQUE (tmdb_id, season_number, episode_number)
)
''')
self.conn.commit()
def close(self):
if self.conn:
self.conn.close()
# shows
def upsert_show(self, tmdb_id:int, name:str, year:str, status:str, poster_local_path:str, latest_season_number:int, last_refreshed_at:int=0, bound_task_names:str="", content_type:str=""):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO shows (tmdb_id, name, year, status, poster_local_path, latest_season_number, last_refreshed_at, bound_task_names, content_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id) DO UPDATE SET
name=excluded.name,
year=excluded.year,
status=excluded.status,
poster_local_path=excluded.poster_local_path,
latest_season_number=excluded.latest_season_number,
last_refreshed_at=excluded.last_refreshed_at,
bound_task_names=excluded.bound_task_names,
content_type=excluded.content_type
''', (tmdb_id, name, year, status, poster_local_path, latest_season_number, last_refreshed_at, bound_task_names, content_type))
self.conn.commit()
def bind_task_to_show(self, tmdb_id:int, task_name:str):
"""绑定任务到节目,在 bound_task_names 字段中记录任务名"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return False
current_bound_tasks = row[0] or ""
task_list = current_bound_tasks.split(',') if current_bound_tasks else []
# 如果任务名不在绑定列表中,则添加
if task_name not in task_list:
task_list.append(task_name)
new_bound_tasks = ','.join(task_list)
cursor.execute('UPDATE shows SET bound_task_names=? WHERE tmdb_id=?', (new_bound_tasks, tmdb_id))
self.conn.commit()
return True
return False
def unbind_task_from_show(self, tmdb_id:int, task_name:str):
"""从节目解绑任务,更新 bound_task_names 列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return False
current_bound_tasks = row[0] or ""
task_list = [t for t in (current_bound_tasks.split(',') if current_bound_tasks else []) if t]
if task_name in task_list:
task_list.remove(task_name)
new_bound_tasks = ','.join(task_list)
cursor.execute('UPDATE shows SET bound_task_names=? WHERE tmdb_id=?', (new_bound_tasks, tmdb_id))
self.conn.commit()
return True
return False
def get_bound_tasks_for_show(self, tmdb_id:int):
"""获取绑定到指定节目的任务列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row or not row[0]:
return []
return row[0].split(',')
def get_show_by_task_name(self, task_name:str):
"""根据任务名查找绑定的节目"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE bound_task_names LIKE ?', (f'%{task_name}%',))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def get_show(self, tmdb_id:int):
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def delete_show(self, tmdb_id:int):
cursor = self.conn.cursor()
cursor.execute('DELETE FROM episodes WHERE tmdb_id=?', (tmdb_id,))
cursor.execute('DELETE FROM seasons WHERE tmdb_id=?', (tmdb_id,))
cursor.execute('DELETE FROM shows WHERE tmdb_id=?', (tmdb_id,))
self.conn.commit()
# seasons
def upsert_season(self, tmdb_id:int, season_number:int, episode_count:int, refresh_url:str, season_name:str=""):
cursor = self.conn.cursor()
# 迁移:如缺少 season_name 字段则补充
try:
cursor.execute("PRAGMA table_info(seasons)")
columns = [column[1] for column in cursor.fetchall()]
if 'season_name' not in columns:
cursor.execute('ALTER TABLE seasons ADD COLUMN season_name TEXT')
except Exception:
pass
cursor.execute('''
INSERT INTO seasons (tmdb_id, season_number, season_name, episode_count, refresh_url)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id, season_number) DO UPDATE SET
season_name=excluded.season_name,
episode_count=excluded.episode_count,
refresh_url=excluded.refresh_url
''', (tmdb_id, season_number, season_name, episode_count, refresh_url))
self.conn.commit()
def get_season(self, tmdb_id:int, season_number:int):
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM seasons WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
# episodes
def upsert_episode(self, tmdb_id:int, season_number:int, episode_number:int, name:str, overview:str, air_date:str, runtime, ep_type:str, updated_at:int):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO episodes (tmdb_id, season_number, episode_number, name, overview, air_date, runtime, type, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id, season_number, episode_number) DO UPDATE SET
name=excluded.name,
overview=excluded.overview,
air_date=excluded.air_date,
runtime=COALESCE(episodes.runtime, excluded.runtime),
type=COALESCE(excluded.type, episodes.type),
updated_at=excluded.updated_at
''', (tmdb_id, season_number, episode_number, name, overview, air_date, runtime, ep_type, updated_at))
self.conn.commit()
def list_latest_season_episodes(self, tmdb_id:int, latest_season:int):
cursor = self.conn.cursor()
cursor.execute('''
SELECT episode_number, name, overview, air_date, runtime, type
FROM episodes
WHERE tmdb_id=? AND season_number=?
ORDER BY episode_number ASC
''', (tmdb_id, latest_season))
rows = cursor.fetchall()
return [
{
'episode_number': r[0],
'name': r[1],
'overview': r[2],
'air_date': r[3],
'runtime': r[4],
'type': r[5],
} for r in rows
]
def list_all_latest_episodes(self):
"""返回所有已知剧目的最新季的所有集(扁平列表,供前端汇总显示)"""
cursor = self.conn.cursor()
cursor.execute('SELECT tmdb_id, name, year, status, poster_local_path, latest_season_number FROM shows')
shows = cursor.fetchall()
result = []
for tmdb_id, name, year, status, poster_local_path, latest_season in shows:
eps = self.list_latest_season_episodes(tmdb_id, latest_season)
for e in eps:
item = {
'tmdb_id': tmdb_id,
'show_name': name,
'year': year,
'status': status,
'poster_local_path': poster_local_path,
'season_number': latest_season,
**e,
}
result.append(item)
return result
# 内容类型管理方法
def update_show_content_type(self, tmdb_id:int, content_type:str):
"""更新节目的内容类型"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET content_type=? WHERE tmdb_id=?', (content_type, tmdb_id))
self.conn.commit()
return cursor.rowcount > 0
def get_show_content_type(self, tmdb_id:int):
"""获取节目的内容类型"""
cursor = self.conn.cursor()
cursor.execute('SELECT content_type FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
return row[0] if row else None
def get_shows_by_content_type(self, content_type:str):
"""根据内容类型获取节目列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE content_type=? ORDER BY name', (content_type,))
rows = cursor.fetchall()
if not rows:
return []
columns = [c[0] for c in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_all_content_types(self):
"""获取所有已使用的内容类型"""
cursor = self.conn.cursor()
cursor.execute('SELECT DISTINCT content_type FROM shows WHERE content_type IS NOT NULL AND content_type != "" ORDER BY content_type')
rows = cursor.fetchall()
return [row[0] for row in rows]
def bind_task_and_content_type(self, tmdb_id:int, task_name:str, content_type:str):
"""绑定任务到节目并设置内容类型"""
# 先绑定任务
self.bind_task_to_show(tmdb_id, task_name)
# 再更新内容类型
self.update_show_content_type(tmdb_id, content_type)
# --------- 扩展:管理季与集清理/更新工具方法 ---------
def purge_other_seasons(self, tmdb_id: int, keep_season_number: int):
"""清除除指定季之外的所有季与对应集数据"""
cursor = self.conn.cursor()
# 删除其他季的 episodes
cursor.execute('DELETE FROM episodes WHERE tmdb_id=? AND season_number != ?', (tmdb_id, keep_season_number))
# 删除其他季的 seasons 行
cursor.execute('DELETE FROM seasons WHERE tmdb_id=? AND season_number != ?', (tmdb_id, keep_season_number))
self.conn.commit()
def delete_season(self, tmdb_id: int, season_number: int):
"""删除指定季及其所有集数据"""
cursor = self.conn.cursor()
cursor.execute('DELETE FROM episodes WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
cursor.execute('DELETE FROM seasons WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
self.conn.commit()
def update_show_latest_season_number(self, tmdb_id: int, latest_season_number: int):
"""更新 shows.latest_season_number"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET latest_season_number=? WHERE tmdb_id=?', (latest_season_number, tmdb_id))
self.conn.commit()
def update_show_poster(self, tmdb_id: int, poster_local_path: str):
"""更新节目的海报路径"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET poster_local_path=? WHERE tmdb_id=?', (poster_local_path, tmdb_id))
self.conn.commit()
def get_all_shows(self):
"""获取所有节目"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows')
columns = [description[0] for description in cursor.description]
rows = cursor.fetchall()
return [dict(zip(columns, row)) for row in rows]

487
app/sdk/tmdb_service.py Normal file
View File

@ -0,0 +1,487 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TMDB服务模块
用于获取电视节目信息和播出时间表
"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class TMDBService:
def __init__(self, api_key: str = None, poster_language: str = "zh-CN"):
self.api_key = api_key
# 首选改为 api.tmdb.org备选为 api.themoviedb.org
self.primary_url = "https://api.tmdb.org/3"
self.backup_url = "https://api.themoviedb.org/3"
self.current_url = self.primary_url
self.language = "zh-CN" # 返回中文数据
self.poster_language = poster_language # 海报语言设置
# 复用会话,开启重试
self.session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"]) # 简单退避
adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=50)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 简单内存缓存,避免短时间重复请求
self._cache = {}
self._cache_ttl_seconds = 600
def is_configured(self) -> bool:
"""检查TMDB API是否已配置"""
return bool(self.api_key and self.api_key.strip())
def reset_to_primary_url(self):
"""重置到主API地址"""
self.current_url = self.primary_url
logger.info("TMDB API地址已重置为主地址")
def get_current_api_url(self) -> str:
"""获取当前使用的API地址"""
return self.current_url
def is_using_backup_url(self) -> bool:
"""检查是否正在使用备用地址"""
return self.current_url == self.backup_url
def _make_request(self, endpoint: str, params: Dict = None) -> Optional[Dict]:
"""发送API请求支持自动切换备用地址"""
if not self.is_configured():
return None
if params is None:
params = {}
params.update({
'api_key': self.api_key,
'language': self.language,
'include_adult': False
})
# 简单缓存键
try:
from time import time as _now
cache_key = (endpoint, tuple(sorted((params or {}).items())))
cached = self._cache.get(cache_key)
if cached and (_now() - cached[0]) < self._cache_ttl_seconds:
return cached[1]
except Exception:
pass
# 尝试主地址
try:
url = f"{self.current_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
try:
self._cache[cache_key] = (_now(), data)
except Exception:
pass
return data
except Exception as e:
logger.warning(f"TMDB主地址请求失败: {e}")
# 如果当前使用的是主地址,尝试切换到备用地址
if self.current_url == self.primary_url:
logger.info("尝试切换到TMDB备用地址...")
self.current_url = self.backup_url
try:
url = f"{self.current_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
logger.info("TMDB备用地址连接成功")
data = response.json()
try:
self._cache[cache_key] = (_now(), data)
except Exception:
pass
return data
except Exception as backup_e:
logger.error(f"TMDB备用地址请求也失败: {backup_e}")
# 重置回主地址,下次请求时重新尝试
self.current_url = self.primary_url
return None
else:
# 如果备用地址也失败,重置回主地址
logger.error(f"TMDB备用地址请求失败: {e}")
self.current_url = self.primary_url
return None
def search_tv_show(self, query: str, year: str = None) -> Optional[Dict]:
"""搜索电视剧"""
params = {
'query': query,
'first_air_date_year': year
}
result = self._make_request('/search/tv', params)
if result and result.get('results'):
# 返回第一个匹配结果
return result['results'][0]
return None
def get_tv_show_details(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧详细信息"""
return self._make_request(f'/tv/{tv_id}')
def get_tv_show_alternative_titles(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧的别名信息"""
return self._make_request(f'/tv/{tv_id}/alternative_titles')
def _is_chinese_text(self, text: str) -> bool:
"""检查文本是否包含中文字符"""
if not text:
return False
for char in text:
if '\u4e00' <= char <= '\u9fff': # 中文字符范围
return True
return False
def get_chinese_title_with_fallback(self, tv_id: int, original_title: str = "") -> str:
"""
获取中文标题如果中文标题为空或不是中文则从别名中获取中国地区的别名
Args:
tv_id: TMDB ID
original_title: 原始标题作为最后的备用方案
Returns:
中文标题或备用标题
"""
try:
# 首先获取节目详情,检查是否有中文标题
details = self.get_tv_show_details(tv_id)
if details:
tmdb_name = details.get('name', '').strip()
# 检查TMDB返回的标题是否包含中文字符
if tmdb_name and self._is_chinese_text(tmdb_name):
logger.debug(f"直接获取到中文标题: {tmdb_name} (TMDB ID: {tv_id})")
return tmdb_name
# 如果TMDB返回的标题不是中文尝试从别名中获取中国地区的别名
alternative_titles = self.get_tv_show_alternative_titles(tv_id)
if alternative_titles and alternative_titles.get('results'):
# 查找中国地区的别名
for alt_title in alternative_titles['results']:
if alt_title.get('iso_3166_1') == 'CN':
chinese_alt_title = alt_title.get('title', '').strip()
if chinese_alt_title and self._is_chinese_text(chinese_alt_title):
logger.debug(f"从别名中获取到中文标题: {chinese_alt_title} (TMDB ID: {tv_id})")
return chinese_alt_title
# 如果都没有找到中文标题,返回原始标题
logger.debug(f"未找到中文标题,使用原始标题: {original_title} (TMDB ID: {tv_id})")
return original_title
else:
# 如果无法获取详情,返回原始标题
return original_title
except Exception as e:
logger.warning(f"获取中文标题失败: {e}, 使用原始标题: {original_title}")
return original_title
def get_tv_show_episodes(self, tv_id: int, season_number: int) -> Optional[Dict]:
"""获取指定季的剧集信息"""
return self._make_request(f'/tv/{tv_id}/season/{season_number}')
def get_tv_show_air_dates(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧播出时间信息"""
return self._make_request(f'/tv/{tv_id}/air_dates')
def get_tv_show_episode_air_dates(self, tv_id: int, season_number: int) -> List[Dict]:
"""获取指定季所有剧集的播出时间"""
episodes = self.get_tv_show_episodes(tv_id, season_number)
if not episodes or 'episodes' not in episodes:
return []
episode_list = []
for episode in episodes['episodes']:
if episode.get('air_date'):
episode_list.append({
'episode_number': episode.get('episode_number'),
'air_date': episode.get('air_date'),
'name': episode.get('name'),
'overview': episode.get('overview')
})
return episode_list
# ===== 节目状态中文映射(不含 returning_series 场景判断;该判断在 run.py 本地完成) =====
def map_show_status_cn(self, status: str) -> str:
"""将 TMDB 节目状态映射为中文。未知状态保持原样。"""
try:
if not status:
return ''
key = str(status).strip().lower().replace(' ', '_')
mapping = {
'returning_series': '播出中',
'in_production': '制作中',
'planned': '计划中',
'ended': '已完结',
'canceled': '已取消',
'cancelled': '已取消',
'pilot': '试播集',
'rumored': '待确认',
}
return mapping.get(key, status)
except Exception:
return status
# 注意returning_series 的“播出中/本季终”判断在 run.py 使用本地 seasons/episodes 统计完成
def arabic_to_chinese_numeral(self, number: int) -> str:
"""将阿拉伯数字转换为中文数字用于季数支持到万0 < number < 100000
规则与约定
- 基本单位包含2的特殊口语用法
- 10-19 省略一十10=11=十一
- 的使用 // 位上优先使用如200=两百2000=两千20000=两万十位仍用二十
- 正确处理的读法 101=一百零一1001=一千零一10010=一万零一十
- 超出范围<=0 >=100000时返回原数字字符串
"""
try:
number = int(number)
except Exception:
return str(number)
if number <= 0 or number >= 100000:
return str(number)
digits = ["", "", "", "", "", "", "", "", "", ""]
def convert_0_9999(n: int) -> str:
if n == 0:
return ""
if n < 10:
return digits[n]
if n < 20:
# 十到十九
return "" + (digits[n - 10] if n > 10 else "")
parts = []
thousand = n // 1000
hundred = (n % 1000) // 100
ten = (n % 100) // 10
one = n % 10
# 千位
if thousand:
if thousand == 2:
parts.append("两千")
else:
parts.append(digits[thousand] + "")
# 百位
if hundred:
if thousand and hundred == 0:
# 不会发生hundred 有值才进来
pass
if hundred == 2:
parts.append("两百")
else:
parts.append(digits[hundred] + "")
else:
if thousand and (ten != 0 or one != 0):
parts.append("")
# 十位
if ten:
if ten == 1 and not thousand and not hundred:
# 10-19 形式(已在 n<20 处理)但也考虑 0xx 场景
parts.append("")
else:
parts.append(digits[ten] + "")
else:
if (hundred or thousand) and one != 0:
parts.append("")
# 个位
if one:
parts.append(digits[one])
# 合并并清理多余“零”
result = ''.join(parts)
# 去重连续零
while "零零" in result:
result = result.replace("零零", "")
# 尾部零去掉
if result.endswith(""):
result = result[:-1]
return result
if number < 10000:
return convert_0_9999(number)
# 处理万级number = a * 10000 + b, 1 <= a <= 9, 0 <= b < 10000
wan = number // 10000
rest = number % 10000
# 万位上的 2 使用“两万”,其他使用常规数字 + 万
if wan == 2:
prefix = "两万"
else:
prefix = digits[wan] + ""
if rest == 0:
return prefix
# rest 存在且不足四位时,需要根据是否存在中间的 0 添加“零”
rest_str = convert_0_9999(rest)
# 当 rest < 1000 时,且 rest_str 不以“零”开头,需要补一个“零”
if rest < 1000:
if not rest_str.startswith(""):
return prefix + "" + rest_str
return prefix + rest_str
def process_season_name(self, raw_name: str) -> str:
"""将 TMDB 返回的季名称进行本地化处理:
- 第1季 24 转换为第一季第二十四季
- 其他名称保持原样
"""
try:
if not raw_name:
return raw_name
# 匹配“第 1 季”“第1季”“第 24 季”等
m = re.search(r"\s*(\d+)\s*季", raw_name)
if m:
n = int(m.group(1))
cn = self.arabic_to_chinese_numeral(n)
return re.sub(r"\s*\d+\s*季", f"{cn}", raw_name)
return raw_name
except Exception:
return raw_name
def search_and_get_episodes(self, show_name: str, year: str = None) -> Optional[Dict]:
"""搜索电视剧并获取剧集信息"""
# 搜索电视剧
show = self.search_tv_show(show_name, year)
if not show:
return None
tv_id = show.get('id')
if not tv_id:
return None
# 获取详细信息
details = self.get_tv_show_details(tv_id)
if not details:
return None
# 获取所有季的剧集信息
seasons = details.get('seasons', [])
all_episodes = []
for season in seasons:
season_number = season.get('season_number', 0)
if season_number > 0: # 排除特殊季如第0季
episodes = self.get_tv_show_episode_air_dates(tv_id, season_number)
for episode in episodes:
episode['season_number'] = season_number
episode['show_name'] = show_name
episode['show_id'] = tv_id
all_episodes.append(episode)
return {
'show_info': {
'id': tv_id,
'name': show_name,
'original_name': details.get('original_name'),
'overview': details.get('overview'),
'poster_path': details.get('poster_path'),
'first_air_date': details.get('first_air_date'),
'media_type': details.get('media_type', 'tv')
},
'episodes': all_episodes
}
def get_episodes_by_date_range(self, start_date: str, end_date: str, show_name: str = None) -> List[Dict]:
"""获取指定日期范围内的剧集播出信息"""
if not self.is_configured():
return []
# 这里可以实现更复杂的日期范围查询
# 目前简化实现,返回空列表
# 实际项目中可以通过TMDB的discover API或其他方式实现
return []
def convert_to_beijing_time(self, utc_time_str: str) -> str:
"""将UTC时间转换为北京时间"""
try:
# 解析UTC时间
utc_time = datetime.fromisoformat(utc_time_str.replace('Z', '+00:00'))
# 转换为北京时间UTC+8
beijing_time = utc_time + timedelta(hours=8)
return beijing_time.strftime('%Y-%m-%d')
except Exception as e:
logger.error(f"时间转换失败: {e}")
return utc_time_str
def get_poster_path_with_language(self, tv_id: int) -> str:
"""根据海报语言设置获取海报路径"""
try:
if not self.is_configured():
return ''
# 获取节目详情
details = self.get_tv_show_details(tv_id)
if not details:
return ''
# 如果设置为原始语言,需要获取原始语言代码
if self.poster_language == "original":
original_language = details.get('original_language', 'en')
# 如果原始语言是zh使用zh-CN
if original_language == 'zh':
original_language = 'zh-CN'
# 使用原始语言请求海报
params = {
'api_key': self.api_key,
'language': original_language,
'include_adult': False
}
# 尝试获取原始语言海报
try:
url = f"{self.current_url}/tv/{tv_id}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
poster_path = data.get('poster_path', '')
if poster_path:
return poster_path
except Exception as e:
logger.warning(f"获取原始语言海报失败: {e}")
# 如果设置为中文或原始语言获取失败,尝试中文海报
if self.poster_language == "zh-CN" or self.poster_language == "original":
params = {
'api_key': self.api_key,
'language': 'zh-CN',
'include_adult': False
}
try:
url = f"{self.current_url}/tv/{tv_id}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
poster_path = data.get('poster_path', '')
if poster_path:
return poster_path
except Exception as e:
logger.warning(f"获取中文海报失败: {e}")
# 如果都失败了,返回默认海报路径
return details.get('poster_path', '')
except Exception as e:
logger.error(f"获取海报路径失败: {e}")
return ''

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

333
app/utils/task_extractor.py Normal file
View File

@ -0,0 +1,333 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
任务信息提取模块
用于从任务中提取剧名年份类型和进度信息
"""
import re
import os
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class TaskExtractor:
def __init__(self):
# 剧集编号提取模式
self.episode_patterns = [
r'S(\d{1,2})E(\d{1,3})', # S01E01
r'E(\d{1,3})', # E01
r'第(\d{1,3})集', # 第1集
r'第(\d{1,3})期', # 第1期
r'(\d{1,3})集', # 1集
r'(\d{1,3})期', # 1期
]
# 日期提取模式
self.date_patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})', # 2025-01-01
r'(\d{4})/(\d{1,2})/(\d{1,2})', # 2025/01/01
r'(\d{4})\.(\d{1,2})\.(\d{1,2})', # 2025.01.01
]
def extract_show_info_from_path(self, save_path: str) -> Dict:
"""
从保存路径中提取剧名和年份信息
Args:
save_path: 任务的保存路径
Returns:
包含剧名和年份的字典
"""
if not save_path:
return {'show_name': '', 'year': '', 'type': 'other'}
# 分割路径
path_parts = save_path.split('/')
show_name = ''
year = ''
content_type = 'other'
# 查找包含年份信息的路径部分
for part in path_parts:
# 匹配年份格式:剧名 (年份)、剧名(年份)、剧名(年份)、剧名 年份
year_patterns = [
r'^(.+?)\s*[\(](\d{4})[\)]$', # 剧名 (2025) 或 剧名2025
r'^(.+?)\s*(\d{4})$', # 剧名 2025
r'^(.+?)\s*\((\d{4})\)$', # 剧名(2025)
]
for pattern in year_patterns:
match = re.match(pattern, part.strip())
if match:
show_name = match.group(1).strip()
year = match.group(2)
break
if show_name and year:
break
# 如果没有找到年份信息,尝试从任务名称中提取
if not show_name:
show_name = self.extract_show_name_from_taskname(path_parts[-1] if path_parts else '')
# 判断内容类型
content_type = self.determine_content_type(save_path)
return {
'show_name': show_name,
'year': year,
'type': content_type
}
def extract_show_name_from_taskname(self, task_name: str) -> str:
"""
从任务名称中提取纯剧名去除季信息等
Args:
task_name: 任务名称
Returns:
提取的剧名
"""
if not task_name:
return ''
# 移除季信息
season_patterns = [
r'^(.+?)\s*[Ss](\d{1,2})', # 剧名 S01
r'^(.+?)\s*第(\d{1,2})季', # 剧名 第1季
r'^(.+?)\s*Season\s*(\d{1,2})', # 剧名 Season 1
r'^(.+?)\s*第([一二三四五六七八九十]+)季', # 剧名 第一季
]
for pattern in season_patterns:
match = re.match(pattern, task_name)
if match:
# 提取到季信息前的名称后,进一步清理尾部多余分隔符/空白
name = match.group(1).strip()
# 去除名称结尾处常见分隔符(空格、横杠、下划线、点、破折号、间隔点、顿号、冒号等)
name = re.sub(r"[\s\-_.—·、:]+$", "", name)
return name
# 如果没有季信息,直接返回任务名称
# 未匹配到季信息时,直接清理尾部多余分隔符/空白后返回
cleaned = task_name.strip()
cleaned = re.sub(r"[\s\-_.—·、:]+$", "", cleaned)
return cleaned
def determine_content_type(self, save_path: str) -> str:
"""
根据保存路径判断内容类型
Args:
save_path: 保存路径
Returns:
内容类型tv剧集anime动画variety综艺documentary纪录片other其他
"""
if not save_path:
return 'other'
path_lower = save_path.lower()
# 根据路径关键词判断类型
if any(keyword in path_lower for keyword in ['剧集', '电视剧', '电视', '电视节目', '连续剧', '影集', 'tv', 'drama']):
return 'tv'
elif any(keyword in path_lower for keyword in ['动画', '动漫', '动画片', '卡通片', '卡通', 'anime', 'cartoon']):
return 'anime'
elif any(keyword in path_lower for keyword in ['综艺', '真人秀', '综艺节目', '娱乐节目', 'variety', 'show']):
return 'variety'
elif any(keyword in path_lower for keyword in ['纪录片', '记录片', 'documentary', 'doc']):
return 'documentary'
else:
return 'other'
def extract_progress_from_latest_file(self, latest_file: str) -> Dict:
"""
从最近转存文件中提取进度信息
Args:
latest_file: 最近转存文件信息 S02E24 2025-08-30
Returns:
包含进度信息的字典
"""
if not latest_file:
return {'episode_number': None, 'air_date': None, 'progress_type': 'unknown'}
# 特殊处理:检测"日期 连接符 第x期"格式,优先使用日期
# 支持各种连接符号:空格、-、_、.、/等
date_episode_patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})\s*[-\s_\.\/]\s*第(\d{1,3})期', # 2025-09-08 - 第128期, 2025-09-08 第128期, 2025-09-08_第128期, 2025-09-08.第128期
r'(\d{4})/(\d{1,2})/(\d{1,2})\s*[-\s_\.\/]\s*第(\d{1,3})期', # 2025/09/08 - 第128期, 2025/09/08 第128期, 2025/09/08_第128期, 2025/09/08.第128期
r'(\d{4})\.(\d{1,2})\.(\d{1,2})\s*[-\s_\.\/]\s*第(\d{1,3})期', # 2025.09.08 - 第128期, 2025.09.08 第128期, 2025.09.08_第128期, 2025.09.08.第128期
]
for pattern in date_episode_patterns:
match = re.search(pattern, latest_file)
if match:
year, month, day = match.groups()[:3] # 只取前3个组年月日
date_str = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return {
'episode_number': None,
'season_number': None,
'air_date': date_str,
'progress_type': 'date'
}
# 尝试提取集数信息
for pattern in self.episode_patterns:
match = re.search(pattern, latest_file)
if match:
if 'S' in pattern and 'E' in pattern:
# S01E01 格式
season = match.group(1)
episode = match.group(2)
return {
'episode_number': int(episode),
'season_number': int(season),
'progress_type': 'episode'
}
else:
# E01 或其他格式
episode = match.group(1)
return {
'episode_number': int(episode),
'season_number': None,
'progress_type': 'episode'
}
# 尝试提取日期信息
for pattern in self.date_patterns:
match = re.search(pattern, latest_file)
if match:
year, month, day = match.groups()
date_str = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return {
'episode_number': None,
'season_number': None,
'air_date': date_str,
'progress_type': 'date'
}
return {
'episode_number': None,
'season_number': None,
'air_date': None,
'progress_type': 'unknown'
}
def extract_all_tasks_info(self, tasks: List[Dict], task_latest_files: Dict) -> List[Dict]:
"""
提取所有任务的信息
Args:
tasks: 任务列表
task_latest_files: 任务最近转存文件信息
Returns:
包含所有任务信息的列表
"""
logging.debug("TaskExtractor.extract_all_tasks_info 开始")
logging.debug(f"tasks数量: {len(tasks)}")
logging.debug(f"task_latest_files数量: {len(task_latest_files)}")
tasks_info = []
for i, task in enumerate(tasks):
try:
logging.debug(f"处理第{i+1}个任务: {task.get('taskname', '')}")
task_name = task.get('taskname', '')
save_path = task.get('savepath', '')
latest_file = task_latest_files.get(task_name, '')
logging.debug(f"task_name: {task_name}")
logging.debug(f"save_path: {save_path}")
logging.debug(f"latest_file: {latest_file}")
# 提取基本信息
show_info = self.extract_show_info_from_path(save_path)
logging.debug(f"show_info: {show_info}")
# 提取进度信息
progress_info = self.extract_progress_from_latest_file(latest_file)
logging.debug(f"progress_info: {progress_info}")
# 优先使用任务显式类型(配置或提取出的),否则回退到路径判断
explicit_type = None
try:
explicit_type = (task.get('calendar_info') or {}).get('extracted', {}).get('content_type')
except Exception:
explicit_type = None
if not explicit_type:
explicit_type = task.get('content_type')
final_type = (explicit_type or show_info['type'] or 'other')
# 合并信息
task_info = {
'task_name': task_name,
'save_path': save_path,
'show_name': show_info['show_name'],
'year': show_info['year'],
'content_type': final_type,
'latest_file': latest_file,
'episode_number': progress_info.get('episode_number'),
'season_number': progress_info.get('season_number'),
'air_date': progress_info.get('air_date'),
'progress_type': progress_info.get('progress_type')
}
logging.debug(f"task_info: {task_info}")
tasks_info.append(task_info)
except Exception as e:
logging.debug(f"处理任务 {i+1} 时出错: {e}")
import traceback
traceback.print_exc()
continue
logging.debug(f"TaskExtractor.extract_all_tasks_info 完成,返回任务数量: {len(tasks_info)}")
return tasks_info
def get_content_type_display_name(self, content_type: str) -> str:
"""
获取内容类型的显示名称
Args:
content_type: 内容类型代码
Returns:
显示名称
"""
type_names = {
'tv': '剧集',
'anime': '动画',
'variety': '综艺',
'documentary': '纪录片',
'other': '其他'
}
return type_names.get(content_type, '其他')
def get_content_types_with_content(self, tasks_info: List[Dict]) -> List[str]:
"""
获取有内容的任务类型列表
Args:
tasks_info: 任务信息列表
Returns:
有内容的类型列表
"""
types = set()
for task_info in tasks_info:
if task_info['show_name']: # 只统计有剧名的任务
types.add(task_info['content_type'])
return sorted(list(types))

View File

@ -1124,6 +1124,19 @@ class Quark:
response = requests.request(method, url, headers=headers, **kwargs)
# print(f"{response.text}")
# response.raise_for_status() # 检查请求是否成功但返回非200也会抛出异常
# 检查响应内容中是否包含inner error将其转换为request error
try:
response_json = response.json()
if isinstance(response_json, dict) and response_json.get("message"):
error_message = response_json.get("message", "")
if "inner error" in error_message.lower():
# 将inner error转换为request error避免误判为资源失效
response_json["message"] = "request error"
response._content = json.dumps(response_json).encode('utf-8')
except:
pass # 如果JSON解析失败保持原响应不变
return response
except Exception as e:
print(f"_send_request error:\n{e}")
@ -1206,9 +1219,38 @@ class Quark:
else:
return False, response["message"]
def get_detail(self, pwd_id, stoken, pdir_fid, _fetch_share=0):
def is_recoverable_error(self, error_message):
"""
检查错误是否为可恢复的错误网络错误服务端临时错误等
Args:
error_message: 错误消息
Returns:
bool: 是否为可恢复错误
"""
if not error_message:
return False
error_message = error_message.lower()
recoverable_errors = [
"inner error",
"request error",
"网络错误",
"服务端错误",
"临时错误",
"timeout",
"connection error",
"server error"
]
return any(error in error_message for error in recoverable_errors)
def get_detail(self, pwd_id, stoken, pdir_fid, _fetch_share=0, max_retries=3):
list_merge = []
page = 1
retry_count = 0
while True:
url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/detail"
querystring = {
@ -1231,6 +1273,26 @@ class Quark:
except Exception:
return {"error": "request error"}
# 检查响应中是否包含inner error或其他可恢复错误
if isinstance(response, dict) and response.get("message"):
error_message = response.get("message", "")
if "inner error" in error_message.lower():
# 对于inner error尝试重试
if retry_count < max_retries:
retry_count += 1
# 静默重试,不输出重试信息,避免日志污染
# 只有在调试模式下才记录详细重试信息
try:
import os
if os.environ.get("DEBUG", "false").lower() == "true":
print(f"[DEBUG] 遇到inner error进行第{retry_count}次重试...")
except:
pass # 如果无法获取DEBUG环境变量静默处理
time.sleep(1) # 等待1秒后重试
continue
else:
return {"error": "request error"} # 重试次数用尽返回request error
# 统一判错:某些情况下返回没有 code 字段
code = response.get("code")
status = response.get("status")
@ -4811,7 +4873,8 @@ def do_save(account, tasklist=[]):
else:
# 添加基本通知
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 修正首次运行时对子目录的处理 - 只有在首次运行且有新增的子目录时才显示子目录内容
if has_update_in_root and has_update_in_subdir and is_first_run and len(new_added_dirs) == 0:
@ -5142,7 +5205,8 @@ def do_save(account, tasklist=[]):
# 添加成功通知 - 修复问题:确保在有文件时添加通知
if display_files:
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 创建episode_pattern函数用于排序
@ -5232,7 +5296,8 @@ def do_save(account, tasklist=[]):
# 添加成功通知
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 打印文件列表
for idx, file_name in enumerate(display_files):