quark-auto-save/app/sdk/cloudsaver.py
x1ao4 3ccaeeae15 优化资源搜索结果的去重逻辑,改为去重时保留最新记录
- PanSou:按 shareurl 归并,保留 publish_date 最新记录
- CloudSaver:按 shareurl 归并,保留 datetime 最新记录
- 聚合层(/task_suggestions):
  - 首轮:按 shareurl 归并取最新;仅在无链接时用指纹兜底
  - 二次:按“标准化标题+时间戳”归并(兼容多种时间格式)
    - 同秒平手:优先 CloudSaver,其次内容更丰富
  - 最终仍按 publish_date 倒序展示
2025-08-27 22:23:01 +08:00

223 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import requests
class CloudSaver:
"""
CloudSaver 类,用于获取云盘资源
"""
def __init__(self, server):
self.server = server
self.username = None
self.password = None
self.token = None
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
def set_auth(self, username, password, token=""):
self.username = username
self.password = password
self.token = token
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
def login(self):
if not self.username or not self.password:
return {"success": False, "message": "CloudSaver未设置用户名或密码"}
try:
url = f"{self.server}/api/user/login"
data = {"username": self.username, "password": self.password}
response = self.session.post(url, json=data)
result = response.json()
if result.get("success"):
self.token = result.get("data", {}).get("token")
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
return {"success": True, "token": self.token}
else:
return {
"success": False,
"message": f"CloudSaver登录{result.get('message', '未知错误')}",
}
except Exception as e:
return {"success": False, "message": str(e)}
def search(self, keyword, last_message_id=""):
"""
搜索资源
Args:
keyword (str): 搜索关键词
last_message_id (str): 上一条消息ID用于分页
Returns:
list: 搜索结果列表
"""
try:
url = f"{self.server}/api/search"
params = {"keyword": keyword, "lastMessageId": last_message_id}
response = self.session.get(url, params=params)
result = response.json()
if result.get("success"):
data = result.get("data", [])
return {"success": True, "data": data}
else:
return {"success": False, "message": result.get("message", "未知错误")}
except Exception as e:
return {"success": False, "message": str(e)}
def auto_login_search(self, keyword, last_message_id=""):
"""
自动登录并搜索资源
Args:
keyword (str): 搜索关键词
last_message_id (str): 上一条消息ID用于分页
"""
result = self.search(keyword, last_message_id)
if result.get("success"):
return result
else:
if (
result.get("message") == "无效的 token"
or result.get("message") == "未提供 token"
):
login_result = self.login()
if login_result.get("success"):
result = self.search(keyword, last_message_id)
result["new_token"] = login_result.get("token")
return result
else:
return {
"success": False,
"message": login_result.get("message", "未知错误"),
}
return {"success": False, "message": result.get("message", "未知错误")}
def clean_search_results(self, search_results):
"""
清洗搜索结果
Args:
search_results (list): 搜索结果列表
Returns:
list: 夸克网盘链接列表
"""
pattern_title = r"(名称|标题)[:]?(.*)"
pattern_content = r"(描述|简介)[:]?(.*)(链接|标签)"
clean_results = []
# 工具移除标题中的链接http/https 以及常见裸域名的夸克分享)
def strip_links(text: str) -> str:
if not isinstance(text, str):
return text
s = text
import re
# 去除 http/https 链接
s = re.sub(r"https?://\S+", "", s)
# 去除裸域夸克分享链接(不带协议的 pan.quark.cn/...
s = re.sub(r"\bpan\.quark\.cn/\S+", "", s)
# 收尾多余空白和分隔符
s = re.sub(r"\s+", " ", s).strip(" -|·,:;" + " ")
return s.strip()
link_array = []
for channel in search_results:
for item in channel.get("list", []):
cloud_links = item.get("cloudLinks", [])
for link in cloud_links:
if link.get("cloudType") == "quark":
# 清洗标题
title = item.get("title", "")
if match := re.search(pattern_title, title, re.DOTALL):
title = match.group(2)
title = title.replace("&", "&").strip()
# 标题去除链接
title = strip_links(title)
# 清洗内容
content = item.get("content", "")
if match := re.search(pattern_content, content, re.DOTALL):
content = match.group(2)
content = content.replace('<mark class="highlight">', "")
content = content.replace("</mark>", "")
content = content.strip()
# 获取发布时间 - 采用与原始实现一致的方式
pubdate_iso = item.get("pubDate", "") # 原始时间字符串(可能为 ISO 或已是北京时间)
pubdate = pubdate_iso # 不做时区转换,保留来源原始时间
# 收集结果(不在此处去重,统一在末尾按最新归并)
clean_results.append(
{
"shareurl": link.get("link"),
"taskname": title,
"content": content,
"datetime": pubdate, # 显示用时间
"tags": item.get("tags", []),
"channel": item.get("channelId", ""),
"source": "CloudSaver"
}
)
# 去重:按 shareurl 归并,保留发布时间最新的记录
def to_ts(date_str: str) -> float:
if not date_str:
return 0
try:
s = str(date_str).strip()
from datetime import datetime
try:
return datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timestamp()
except Exception:
pass
try:
return datetime.strptime(s, "%Y-%m-%d").timestamp()
except Exception:
pass
try:
s2 = s.replace('Z', '+00:00')
return datetime.fromisoformat(s2).timestamp()
except Exception:
return 0
except Exception:
return 0
by_url = {}
for item in clean_results:
try:
url = item.get("shareurl", "")
if not url:
continue
existed = by_url.get(url)
if not existed:
by_url[url] = item
else:
# 比较 datetimeCloudSaver清洗阶段时间字段名为 datetime
if to_ts(item.get("datetime")) > to_ts(existed.get("datetime")):
by_url[url] = item
except Exception:
continue
unique_results = list(by_url.values())
# 注意:排序逻辑已移至全局,这里不再进行内部排序
# 返回归并后的结果,由全局排序函数统一处理
return unique_results
# 测试示例
if __name__ == "__main__":
# 创建CloudSaver实例
server = ""
username = ""
password = ""
token = ""
cloud_saver = CloudSaver(server)
cloud_saver.set_auth(username, password, token)
# 搜索资源
results = cloud_saver.auto_login_search("黑镜")
# 提取夸克网盘链接
clean_results = cloud_saver.clean_search_results(results.get("data", []))
# 打印结果
for item in clean_results:
print(f"标题: {item['taskname']}")
print(f"描述: {item['content']}")
print(f"链接: {item['shareurl']}")
print(f"标签: {' '.join(item['tags'])}")
print("-" * 50)