新增追剧日历功能

This commit is contained in:
x1ao4 2025-09-09 16:24:58 +08:00
parent 5170760331
commit 7f42f694ce
7 changed files with 7658 additions and 149 deletions

2262
app/run.py

File diff suppressed because it is too large Load Diff

View File

@ -268,4 +268,313 @@ class RecordDB:
if records:
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in records]
return []
return []
class CalendarDB:
"""追剧日历本地缓存数据库:剧、季、集信息本地化存储"""
def __init__(self, db_path="config/data.db"):
self.db_path = db_path
self.conn = None
self.init_db()
def init_db(self):
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
cursor = self.conn.cursor()
# shows
cursor.execute('''
CREATE TABLE IF NOT EXISTS shows (
tmdb_id INTEGER PRIMARY KEY,
name TEXT,
year TEXT,
status TEXT,
poster_local_path TEXT,
latest_season_number INTEGER,
last_refreshed_at INTEGER,
bound_task_names TEXT,
content_type TEXT
)
''')
# 检查 content_type 字段是否存在,如果不存在则添加
cursor.execute("PRAGMA table_info(shows)")
columns = [column[1] for column in cursor.fetchall()]
if 'content_type' not in columns:
cursor.execute('ALTER TABLE shows ADD COLUMN content_type TEXT')
# seasons
cursor.execute('''
CREATE TABLE IF NOT EXISTS seasons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER,
season_number INTEGER,
season_name TEXT,
episode_count INTEGER,
refresh_url TEXT,
UNIQUE (tmdb_id, season_number)
)
''')
# episodes
cursor.execute('''
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER,
season_number INTEGER,
episode_number INTEGER,
name TEXT,
overview TEXT,
air_date TEXT,
runtime INTEGER,
type TEXT,
updated_at INTEGER,
UNIQUE (tmdb_id, season_number, episode_number)
)
''')
self.conn.commit()
def close(self):
if self.conn:
self.conn.close()
# shows
def upsert_show(self, tmdb_id:int, name:str, year:str, status:str, poster_local_path:str, latest_season_number:int, last_refreshed_at:int=0, bound_task_names:str="", content_type:str=""):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO shows (tmdb_id, name, year, status, poster_local_path, latest_season_number, last_refreshed_at, bound_task_names, content_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id) DO UPDATE SET
name=excluded.name,
year=excluded.year,
status=excluded.status,
poster_local_path=excluded.poster_local_path,
latest_season_number=excluded.latest_season_number,
last_refreshed_at=excluded.last_refreshed_at,
bound_task_names=excluded.bound_task_names,
content_type=excluded.content_type
''', (tmdb_id, name, year, status, poster_local_path, latest_season_number, last_refreshed_at, bound_task_names, content_type))
self.conn.commit()
def bind_task_to_show(self, tmdb_id:int, task_name:str):
"""绑定任务到节目,在 bound_task_names 字段中记录任务名"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return False
current_bound_tasks = row[0] or ""
task_list = current_bound_tasks.split(',') if current_bound_tasks else []
# 如果任务名不在绑定列表中,则添加
if task_name not in task_list:
task_list.append(task_name)
new_bound_tasks = ','.join(task_list)
cursor.execute('UPDATE shows SET bound_task_names=? WHERE tmdb_id=?', (new_bound_tasks, tmdb_id))
self.conn.commit()
return True
return False
def unbind_task_from_show(self, tmdb_id:int, task_name:str):
"""从节目解绑任务,更新 bound_task_names 列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return False
current_bound_tasks = row[0] or ""
task_list = [t for t in (current_bound_tasks.split(',') if current_bound_tasks else []) if t]
if task_name in task_list:
task_list.remove(task_name)
new_bound_tasks = ','.join(task_list)
cursor.execute('UPDATE shows SET bound_task_names=? WHERE tmdb_id=?', (new_bound_tasks, tmdb_id))
self.conn.commit()
return True
return False
def get_bound_tasks_for_show(self, tmdb_id:int):
"""获取绑定到指定节目的任务列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT bound_task_names FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row or not row[0]:
return []
return row[0].split(',')
def get_show_by_task_name(self, task_name:str):
"""根据任务名查找绑定的节目"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE bound_task_names LIKE ?', (f'%{task_name}%',))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def get_show(self, tmdb_id:int):
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
def delete_show(self, tmdb_id:int):
cursor = self.conn.cursor()
cursor.execute('DELETE FROM episodes WHERE tmdb_id=?', (tmdb_id,))
cursor.execute('DELETE FROM seasons WHERE tmdb_id=?', (tmdb_id,))
cursor.execute('DELETE FROM shows WHERE tmdb_id=?', (tmdb_id,))
self.conn.commit()
# seasons
def upsert_season(self, tmdb_id:int, season_number:int, episode_count:int, refresh_url:str, season_name:str=""):
cursor = self.conn.cursor()
# 迁移:如缺少 season_name 字段则补充
try:
cursor.execute("PRAGMA table_info(seasons)")
columns = [column[1] for column in cursor.fetchall()]
if 'season_name' not in columns:
cursor.execute('ALTER TABLE seasons ADD COLUMN season_name TEXT')
except Exception:
pass
cursor.execute('''
INSERT INTO seasons (tmdb_id, season_number, season_name, episode_count, refresh_url)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id, season_number) DO UPDATE SET
season_name=excluded.season_name,
episode_count=excluded.episode_count,
refresh_url=excluded.refresh_url
''', (tmdb_id, season_number, season_name, episode_count, refresh_url))
self.conn.commit()
def get_season(self, tmdb_id:int, season_number:int):
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM seasons WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
row = cursor.fetchone()
if not row:
return None
columns = [c[0] for c in cursor.description]
return dict(zip(columns, row))
# episodes
def upsert_episode(self, tmdb_id:int, season_number:int, episode_number:int, name:str, overview:str, air_date:str, runtime, ep_type:str, updated_at:int):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO episodes (tmdb_id, season_number, episode_number, name, overview, air_date, runtime, type, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(tmdb_id, season_number, episode_number) DO UPDATE SET
name=excluded.name,
overview=excluded.overview,
air_date=excluded.air_date,
runtime=COALESCE(episodes.runtime, excluded.runtime),
type=COALESCE(excluded.type, episodes.type),
updated_at=excluded.updated_at
''', (tmdb_id, season_number, episode_number, name, overview, air_date, runtime, ep_type, updated_at))
self.conn.commit()
def list_latest_season_episodes(self, tmdb_id:int, latest_season:int):
cursor = self.conn.cursor()
cursor.execute('''
SELECT episode_number, name, overview, air_date, runtime, type
FROM episodes
WHERE tmdb_id=? AND season_number=?
ORDER BY episode_number ASC
''', (tmdb_id, latest_season))
rows = cursor.fetchall()
return [
{
'episode_number': r[0],
'name': r[1],
'overview': r[2],
'air_date': r[3],
'runtime': r[4],
'type': r[5],
} for r in rows
]
def list_all_latest_episodes(self):
"""返回所有已知剧目的最新季的所有集(扁平列表,供前端汇总显示)"""
cursor = self.conn.cursor()
cursor.execute('SELECT tmdb_id, name, year, status, poster_local_path, latest_season_number FROM shows')
shows = cursor.fetchall()
result = []
for tmdb_id, name, year, status, poster_local_path, latest_season in shows:
eps = self.list_latest_season_episodes(tmdb_id, latest_season)
for e in eps:
item = {
'tmdb_id': tmdb_id,
'show_name': name,
'year': year,
'status': status,
'poster_local_path': poster_local_path,
'season_number': latest_season,
**e,
}
result.append(item)
return result
# 内容类型管理方法
def update_show_content_type(self, tmdb_id:int, content_type:str):
"""更新节目的内容类型"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET content_type=? WHERE tmdb_id=?', (content_type, tmdb_id))
self.conn.commit()
return cursor.rowcount > 0
def get_show_content_type(self, tmdb_id:int):
"""获取节目的内容类型"""
cursor = self.conn.cursor()
cursor.execute('SELECT content_type FROM shows WHERE tmdb_id=?', (tmdb_id,))
row = cursor.fetchone()
return row[0] if row else None
def get_shows_by_content_type(self, content_type:str):
"""根据内容类型获取节目列表"""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM shows WHERE content_type=? ORDER BY name', (content_type,))
rows = cursor.fetchall()
if not rows:
return []
columns = [c[0] for c in cursor.description]
return [dict(zip(columns, row)) for row in rows]
def get_all_content_types(self):
"""获取所有已使用的内容类型"""
cursor = self.conn.cursor()
cursor.execute('SELECT DISTINCT content_type FROM shows WHERE content_type IS NOT NULL AND content_type != "" ORDER BY content_type')
rows = cursor.fetchall()
return [row[0] for row in rows]
def bind_task_and_content_type(self, tmdb_id:int, task_name:str, content_type:str):
"""绑定任务到节目并设置内容类型"""
# 先绑定任务
self.bind_task_to_show(tmdb_id, task_name)
# 再更新内容类型
self.update_show_content_type(tmdb_id, content_type)
# --------- 扩展:管理季与集清理/更新工具方法 ---------
def purge_other_seasons(self, tmdb_id: int, keep_season_number: int):
"""清除除指定季之外的所有季与对应集数据"""
cursor = self.conn.cursor()
# 删除其他季的 episodes
cursor.execute('DELETE FROM episodes WHERE tmdb_id=? AND season_number != ?', (tmdb_id, keep_season_number))
# 删除其他季的 seasons 行
cursor.execute('DELETE FROM seasons WHERE tmdb_id=? AND season_number != ?', (tmdb_id, keep_season_number))
self.conn.commit()
def delete_season(self, tmdb_id: int, season_number: int):
"""删除指定季及其所有集数据"""
cursor = self.conn.cursor()
cursor.execute('DELETE FROM episodes WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
cursor.execute('DELETE FROM seasons WHERE tmdb_id=? AND season_number=?', (tmdb_id, season_number))
self.conn.commit()
def update_show_latest_season_number(self, tmdb_id: int, latest_season_number: int):
"""更新 shows.latest_season_number"""
cursor = self.conn.cursor()
cursor.execute('UPDATE shows SET latest_season_number=? WHERE tmdb_id=?', (latest_season_number, tmdb_id))
self.conn.commit()

423
app/sdk/tmdb_service.py Normal file
View File

@ -0,0 +1,423 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
TMDB服务模块
用于获取电视节目信息和播出时间表
"""
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class TMDBService:
def __init__(self, api_key: str = None):
self.api_key = api_key
# 首选改为 api.tmdb.org备选为 api.themoviedb.org
self.primary_url = "https://api.tmdb.org/3"
self.backup_url = "https://api.themoviedb.org/3"
self.current_url = self.primary_url
self.language = "zh-CN" # 返回中文数据
# 复用会话,开启重试
self.session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET"]) # 简单退避
adapter = HTTPAdapter(max_retries=retries, pool_connections=20, pool_maxsize=50)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
# 简单内存缓存,避免短时间重复请求
self._cache = {}
self._cache_ttl_seconds = 600
def is_configured(self) -> bool:
"""检查TMDB API是否已配置"""
return bool(self.api_key and self.api_key.strip())
def reset_to_primary_url(self):
"""重置到主API地址"""
self.current_url = self.primary_url
logger.info("TMDB API地址已重置为主地址")
def get_current_api_url(self) -> str:
"""获取当前使用的API地址"""
return self.current_url
def is_using_backup_url(self) -> bool:
"""检查是否正在使用备用地址"""
return self.current_url == self.backup_url
def _make_request(self, endpoint: str, params: Dict = None) -> Optional[Dict]:
"""发送API请求支持自动切换备用地址"""
if not self.is_configured():
return None
if params is None:
params = {}
params.update({
'api_key': self.api_key,
'language': self.language,
'include_adult': False
})
# 简单缓存键
try:
from time import time as _now
cache_key = (endpoint, tuple(sorted((params or {}).items())))
cached = self._cache.get(cache_key)
if cached and (_now() - cached[0]) < self._cache_ttl_seconds:
return cached[1]
except Exception:
pass
# 尝试主地址
try:
url = f"{self.current_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
try:
self._cache[cache_key] = (_now(), data)
except Exception:
pass
return data
except Exception as e:
logger.warning(f"TMDB主地址请求失败: {e}")
# 如果当前使用的是主地址,尝试切换到备用地址
if self.current_url == self.primary_url:
logger.info("尝试切换到TMDB备用地址...")
self.current_url = self.backup_url
try:
url = f"{self.current_url}{endpoint}"
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
logger.info("TMDB备用地址连接成功")
data = response.json()
try:
self._cache[cache_key] = (_now(), data)
except Exception:
pass
return data
except Exception as backup_e:
logger.error(f"TMDB备用地址请求也失败: {backup_e}")
# 重置回主地址,下次请求时重新尝试
self.current_url = self.primary_url
return None
else:
# 如果备用地址也失败,重置回主地址
logger.error(f"TMDB备用地址请求失败: {e}")
self.current_url = self.primary_url
return None
def search_tv_show(self, query: str, year: str = None) -> Optional[Dict]:
"""搜索电视剧"""
params = {
'query': query,
'first_air_date_year': year
}
result = self._make_request('/search/tv', params)
if result and result.get('results'):
# 返回第一个匹配结果
return result['results'][0]
return None
def get_tv_show_details(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧详细信息"""
return self._make_request(f'/tv/{tv_id}')
def get_tv_show_alternative_titles(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧的别名信息"""
return self._make_request(f'/tv/{tv_id}/alternative_titles')
def _is_chinese_text(self, text: str) -> bool:
"""检查文本是否包含中文字符"""
if not text:
return False
for char in text:
if '\u4e00' <= char <= '\u9fff': # 中文字符范围
return True
return False
def get_chinese_title_with_fallback(self, tv_id: int, original_title: str = "") -> str:
"""
获取中文标题如果中文标题为空或不是中文则从别名中获取中国地区的别名
Args:
tv_id: TMDB ID
original_title: 原始标题作为最后的备用方案
Returns:
中文标题或备用标题
"""
try:
# 首先获取节目详情,检查是否有中文标题
details = self.get_tv_show_details(tv_id)
if details:
tmdb_name = details.get('name', '').strip()
# 检查TMDB返回的标题是否包含中文字符
if tmdb_name and self._is_chinese_text(tmdb_name):
logger.info(f"直接获取到中文标题: {tmdb_name} (TMDB ID: {tv_id})")
return tmdb_name
# 如果TMDB返回的标题不是中文尝试从别名中获取中国地区的别名
alternative_titles = self.get_tv_show_alternative_titles(tv_id)
if alternative_titles and alternative_titles.get('results'):
# 查找中国地区的别名
for alt_title in alternative_titles['results']:
if alt_title.get('iso_3166_1') == 'CN':
chinese_alt_title = alt_title.get('title', '').strip()
if chinese_alt_title and self._is_chinese_text(chinese_alt_title):
logger.info(f"从别名中获取到中文标题: {chinese_alt_title} (TMDB ID: {tv_id})")
return chinese_alt_title
# 如果都没有找到中文标题,返回原始标题
logger.info(f"未找到中文标题,使用原始标题: {original_title} (TMDB ID: {tv_id})")
return original_title
else:
# 如果无法获取详情,返回原始标题
return original_title
except Exception as e:
logger.warning(f"获取中文标题失败: {e}, 使用原始标题: {original_title}")
return original_title
def get_tv_show_episodes(self, tv_id: int, season_number: int) -> Optional[Dict]:
"""获取指定季的剧集信息"""
return self._make_request(f'/tv/{tv_id}/season/{season_number}')
def get_tv_show_air_dates(self, tv_id: int) -> Optional[Dict]:
"""获取电视剧播出时间信息"""
return self._make_request(f'/tv/{tv_id}/air_dates')
def get_tv_show_episode_air_dates(self, tv_id: int, season_number: int) -> List[Dict]:
"""获取指定季所有剧集的播出时间"""
episodes = self.get_tv_show_episodes(tv_id, season_number)
if not episodes or 'episodes' not in episodes:
return []
episode_list = []
for episode in episodes['episodes']:
if episode.get('air_date'):
episode_list.append({
'episode_number': episode.get('episode_number'),
'air_date': episode.get('air_date'),
'name': episode.get('name'),
'overview': episode.get('overview')
})
return episode_list
# ===== 节目状态中文映射(不含 returning_series 场景判断;该判断在 run.py 本地完成) =====
def map_show_status_cn(self, status: str) -> str:
"""将 TMDB 节目状态映射为中文。未知状态保持原样。"""
try:
if not status:
return ''
key = str(status).strip().lower().replace(' ', '_')
mapping = {
'returning_series': '播出中',
'in_production': '制作中',
'planned': '计划中',
'ended': '已完结',
'canceled': '已取消',
'cancelled': '已取消',
'pilot': '试播集',
'rumored': '待确认',
}
return mapping.get(key, status)
except Exception:
return status
# 注意returning_series 的“播出中/本季终”判断在 run.py 使用本地 seasons/episodes 统计完成
def arabic_to_chinese_numeral(self, number: int) -> str:
"""将阿拉伯数字转换为中文数字用于季数支持到万0 < number < 100000
规则与约定
- 基本单位包含2的特殊口语用法
- 10-19 省略一十10=11=十一
- 的使用 // 位上优先使用如200=两百2000=两千20000=两万十位仍用二十
- 正确处理的读法 101=一百零一1001=一千零一10010=一万零一十
- 超出范围<=0 >=100000时返回原数字字符串
"""
try:
number = int(number)
except Exception:
return str(number)
if number <= 0 or number >= 100000:
return str(number)
digits = ["", "", "", "", "", "", "", "", "", ""]
def convert_0_9999(n: int) -> str:
if n == 0:
return ""
if n < 10:
return digits[n]
if n < 20:
# 十到十九
return "" + (digits[n - 10] if n > 10 else "")
parts = []
thousand = n // 1000
hundred = (n % 1000) // 100
ten = (n % 100) // 10
one = n % 10
# 千位
if thousand:
if thousand == 2:
parts.append("两千")
else:
parts.append(digits[thousand] + "")
# 百位
if hundred:
if thousand and hundred == 0:
# 不会发生hundred 有值才进来
pass
if hundred == 2:
parts.append("两百")
else:
parts.append(digits[hundred] + "")
else:
if thousand and (ten != 0 or one != 0):
parts.append("")
# 十位
if ten:
if ten == 1 and not thousand and not hundred:
# 10-19 形式(已在 n<20 处理)但也考虑 0xx 场景
parts.append("")
else:
parts.append(digits[ten] + "")
else:
if (hundred or thousand) and one != 0:
parts.append("")
# 个位
if one:
parts.append(digits[one])
# 合并并清理多余“零”
result = ''.join(parts)
# 去重连续零
while "零零" in result:
result = result.replace("零零", "")
# 尾部零去掉
if result.endswith(""):
result = result[:-1]
return result
if number < 10000:
return convert_0_9999(number)
# 处理万级number = a * 10000 + b, 1 <= a <= 9, 0 <= b < 10000
wan = number // 10000
rest = number % 10000
# 万位上的 2 使用“两万”,其他使用常规数字 + 万
if wan == 2:
prefix = "两万"
else:
prefix = digits[wan] + ""
if rest == 0:
return prefix
# rest 存在且不足四位时,需要根据是否存在中间的 0 添加“零”
rest_str = convert_0_9999(rest)
# 当 rest < 1000 时,且 rest_str 不以“零”开头,需要补一个“零”
if rest < 1000:
if not rest_str.startswith(""):
return prefix + "" + rest_str
return prefix + rest_str
def process_season_name(self, raw_name: str) -> str:
"""将 TMDB 返回的季名称进行本地化处理:
- 第1季 24 转换为第一季第二十四季
- 其他名称保持原样
"""
try:
if not raw_name:
return raw_name
# 匹配“第 1 季”“第1季”“第 24 季”等
m = re.search(r"\s*(\d+)\s*季", raw_name)
if m:
n = int(m.group(1))
cn = self.arabic_to_chinese_numeral(n)
return re.sub(r"\s*\d+\s*季", f"{cn}", raw_name)
return raw_name
except Exception:
return raw_name
def search_and_get_episodes(self, show_name: str, year: str = None) -> Optional[Dict]:
"""搜索电视剧并获取剧集信息"""
# 搜索电视剧
show = self.search_tv_show(show_name, year)
if not show:
return None
tv_id = show.get('id')
if not tv_id:
return None
# 获取详细信息
details = self.get_tv_show_details(tv_id)
if not details:
return None
# 获取所有季的剧集信息
seasons = details.get('seasons', [])
all_episodes = []
for season in seasons:
season_number = season.get('season_number', 0)
if season_number > 0: # 排除特殊季如第0季
episodes = self.get_tv_show_episode_air_dates(tv_id, season_number)
for episode in episodes:
episode['season_number'] = season_number
episode['show_name'] = show_name
episode['show_id'] = tv_id
all_episodes.append(episode)
return {
'show_info': {
'id': tv_id,
'name': show_name,
'original_name': details.get('original_name'),
'overview': details.get('overview'),
'poster_path': details.get('poster_path'),
'first_air_date': details.get('first_air_date'),
'media_type': details.get('media_type', 'tv')
},
'episodes': all_episodes
}
def get_episodes_by_date_range(self, start_date: str, end_date: str, show_name: str = None) -> List[Dict]:
"""获取指定日期范围内的剧集播出信息"""
if not self.is_configured():
return []
# 这里可以实现更复杂的日期范围查询
# 目前简化实现,返回空列表
# 实际项目中可以通过TMDB的discover API或其他方式实现
return []
def convert_to_beijing_time(self, utc_time_str: str) -> str:
"""将UTC时间转换为北京时间"""
try:
# 解析UTC时间
utc_time = datetime.fromisoformat(utc_time_str.replace('Z', '+00:00'))
# 转换为北京时间UTC+8
beijing_time = utc_time + timedelta(hours=8)
return beijing_time.strftime('%Y-%m-%d')
except Exception as e:
logger.error(f"时间转换失败: {e}")
return utc_time_str

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

313
app/utils/task_extractor.py Normal file
View File

@ -0,0 +1,313 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
任务信息提取模块
用于从任务中提取剧名年份类型和进度信息
"""
import re
import os
from typing import Dict, List, Optional, Tuple
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class TaskExtractor:
def __init__(self):
# 剧集编号提取模式
self.episode_patterns = [
r'S(\d{1,2})E(\d{1,3})', # S01E01
r'E(\d{1,3})', # E01
r'第(\d{1,3})集', # 第1集
r'第(\d{1,3})期', # 第1期
r'(\d{1,3})集', # 1集
r'(\d{1,3})期', # 1期
]
# 日期提取模式
self.date_patterns = [
r'(\d{4})-(\d{1,2})-(\d{1,2})', # 2025-01-01
r'(\d{4})/(\d{1,2})/(\d{1,2})', # 2025/01/01
r'(\d{4})\.(\d{1,2})\.(\d{1,2})', # 2025.01.01
]
def extract_show_info_from_path(self, save_path: str) -> Dict:
"""
从保存路径中提取剧名和年份信息
Args:
save_path: 任务的保存路径
Returns:
包含剧名和年份的字典
"""
if not save_path:
return {'show_name': '', 'year': '', 'type': 'other'}
# 分割路径
path_parts = save_path.split('/')
show_name = ''
year = ''
content_type = 'other'
# 查找包含年份信息的路径部分
for part in path_parts:
# 匹配年份格式:剧名 (年份)、剧名(年份)、剧名(年份)、剧名 年份
year_patterns = [
r'^(.+?)\s*[\(](\d{4})[\)]$', # 剧名 (2025) 或 剧名2025
r'^(.+?)\s*(\d{4})$', # 剧名 2025
r'^(.+?)\s*\((\d{4})\)$', # 剧名(2025)
]
for pattern in year_patterns:
match = re.match(pattern, part.strip())
if match:
show_name = match.group(1).strip()
year = match.group(2)
break
if show_name and year:
break
# 如果没有找到年份信息,尝试从任务名称中提取
if not show_name:
show_name = self.extract_show_name_from_taskname(path_parts[-1] if path_parts else '')
# 判断内容类型
content_type = self.determine_content_type(save_path)
return {
'show_name': show_name,
'year': year,
'type': content_type
}
def extract_show_name_from_taskname(self, task_name: str) -> str:
"""
从任务名称中提取纯剧名去除季信息等
Args:
task_name: 任务名称
Returns:
提取的剧名
"""
if not task_name:
return ''
# 移除季信息
season_patterns = [
r'^(.+?)\s*[Ss](\d{1,2})', # 剧名 S01
r'^(.+?)\s*第(\d{1,2})季', # 剧名 第1季
r'^(.+?)\s*Season\s*(\d{1,2})', # 剧名 Season 1
r'^(.+?)\s*第([一二三四五六七八九十]+)季', # 剧名 第一季
]
for pattern in season_patterns:
match = re.match(pattern, task_name)
if match:
# 提取到季信息前的名称后,进一步清理尾部多余分隔符/空白
name = match.group(1).strip()
# 去除名称结尾处常见分隔符(空格、横杠、下划线、点、破折号、间隔点、顿号、冒号等)
name = re.sub(r"[\s\-_.—·、:]+$", "", name)
return name
# 如果没有季信息,直接返回任务名称
# 未匹配到季信息时,直接清理尾部多余分隔符/空白后返回
cleaned = task_name.strip()
cleaned = re.sub(r"[\s\-_.—·、:]+$", "", cleaned)
return cleaned
def determine_content_type(self, save_path: str) -> str:
"""
根据保存路径判断内容类型
Args:
save_path: 保存路径
Returns:
内容类型tv剧集anime动画variety综艺documentary纪录片other其他
"""
if not save_path:
return 'other'
path_lower = save_path.lower()
# 根据路径关键词判断类型
if any(keyword in path_lower for keyword in ['剧集', '电视剧', '电视', '电视节目', '连续剧', '影集', 'tv', 'drama']):
return 'tv'
elif any(keyword in path_lower for keyword in ['动画', '动漫', '动画片', '卡通片', '卡通', 'anime', 'cartoon']):
return 'anime'
elif any(keyword in path_lower for keyword in ['综艺', '真人秀', '综艺节目', '娱乐节目', 'variety', 'show']):
return 'variety'
elif any(keyword in path_lower for keyword in ['纪录片', '记录片', 'documentary', 'doc']):
return 'documentary'
else:
return 'other'
def extract_progress_from_latest_file(self, latest_file: str) -> Dict:
"""
从最近转存文件中提取进度信息
Args:
latest_file: 最近转存文件信息 S02E24 2025-08-30
Returns:
包含进度信息的字典
"""
if not latest_file:
return {'episode_number': None, 'air_date': None, 'progress_type': 'unknown'}
# 尝试提取集数信息
for pattern in self.episode_patterns:
match = re.search(pattern, latest_file)
if match:
if 'S' in pattern and 'E' in pattern:
# S01E01 格式
season = match.group(1)
episode = match.group(2)
return {
'episode_number': int(episode),
'season_number': int(season),
'progress_type': 'episode'
}
else:
# E01 或其他格式
episode = match.group(1)
return {
'episode_number': int(episode),
'season_number': None,
'progress_type': 'episode'
}
# 尝试提取日期信息
for pattern in self.date_patterns:
match = re.search(pattern, latest_file)
if match:
year, month, day = match.groups()
date_str = f"{year}-{month.zfill(2)}-{day.zfill(2)}"
return {
'episode_number': None,
'season_number': None,
'air_date': date_str,
'progress_type': 'date'
}
return {
'episode_number': None,
'season_number': None,
'air_date': None,
'progress_type': 'unknown'
}
def extract_all_tasks_info(self, tasks: List[Dict], task_latest_files: Dict) -> List[Dict]:
"""
提取所有任务的信息
Args:
tasks: 任务列表
task_latest_files: 任务最近转存文件信息
Returns:
包含所有任务信息的列表
"""
logging.debug("TaskExtractor.extract_all_tasks_info 开始")
logging.debug(f"tasks数量: {len(tasks)}")
logging.debug(f"task_latest_files数量: {len(task_latest_files)}")
tasks_info = []
for i, task in enumerate(tasks):
try:
logging.debug(f"处理第{i+1}个任务: {task.get('taskname', '')}")
task_name = task.get('taskname', '')
save_path = task.get('savepath', '')
latest_file = task_latest_files.get(task_name, '')
logging.debug(f"task_name: {task_name}")
logging.debug(f"save_path: {save_path}")
logging.debug(f"latest_file: {latest_file}")
# 提取基本信息
show_info = self.extract_show_info_from_path(save_path)
logging.debug(f"show_info: {show_info}")
# 提取进度信息
progress_info = self.extract_progress_from_latest_file(latest_file)
logging.debug(f"progress_info: {progress_info}")
# 优先使用任务显式类型(配置或提取出的),否则回退到路径判断
explicit_type = None
try:
explicit_type = (task.get('calendar_info') or {}).get('extracted', {}).get('content_type')
except Exception:
explicit_type = None
if not explicit_type:
explicit_type = task.get('content_type')
final_type = (explicit_type or show_info['type'] or 'other')
# 合并信息
task_info = {
'task_name': task_name,
'save_path': save_path,
'show_name': show_info['show_name'],
'year': show_info['year'],
'content_type': final_type,
'latest_file': latest_file,
'episode_number': progress_info.get('episode_number'),
'season_number': progress_info.get('season_number'),
'air_date': progress_info.get('air_date'),
'progress_type': progress_info.get('progress_type')
}
logging.debug(f"task_info: {task_info}")
tasks_info.append(task_info)
except Exception as e:
logging.debug(f"处理任务 {i+1} 时出错: {e}")
import traceback
traceback.print_exc()
continue
logging.debug(f"TaskExtractor.extract_all_tasks_info 完成,返回任务数量: {len(tasks_info)}")
return tasks_info
def get_content_type_display_name(self, content_type: str) -> str:
"""
获取内容类型的显示名称
Args:
content_type: 内容类型代码
Returns:
显示名称
"""
type_names = {
'tv': '剧集',
'anime': '动画',
'variety': '综艺',
'documentary': '纪录片',
'other': '其他'
}
return type_names.get(content_type, '其他')
def get_content_types_with_content(self, tasks_info: List[Dict]) -> List[str]:
"""
获取有内容的任务类型列表
Args:
tasks_info: 任务信息列表
Returns:
有内容的类型列表
"""
types = set()
for task_info in tasks_info:
if task_info['show_name']: # 只统计有剧名的任务
types.add(task_info['content_type'])
return sorted(list(types))

View File

@ -4873,7 +4873,8 @@ def do_save(account, tasklist=[]):
else:
# 添加基本通知
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 修正首次运行时对子目录的处理 - 只有在首次运行且有新增的子目录时才显示子目录内容
if has_update_in_root and has_update_in_subdir and is_first_run and len(new_added_dirs) == 0:
@ -5204,7 +5205,8 @@ def do_save(account, tasklist=[]):
# 添加成功通知 - 修复问题:确保在有文件时添加通知
if display_files:
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 创建episode_pattern函数用于排序
@ -5294,7 +5296,8 @@ def do_save(account, tasklist=[]):
# 添加成功通知
add_notify(f"✅《{task['taskname']}》新增文件:")
add_notify(f"{re.sub(r'/{2,}', '/', f'/{task['savepath']}')}")
savepath = task['savepath']
add_notify(f"{re.sub(r'/{2,}', '/', f'/{savepath}')}")
# 打印文件列表
for idx, file_name in enumerate(display_files):