重构逻辑,实现异步请求

This commit is contained in:
holwell 2025-02-14 19:33:58 +08:00
parent 2518c94df9
commit 02e11dfcab
3 changed files with 196 additions and 205 deletions

3
.gitignore vendored
View File

@ -2,4 +2,5 @@ __pycache__/
.history/ .history/
neteasecloudmusicapi/ neteasecloudmusicapi/
*.txt *.txt
*.json *.json
test.py

View File

@ -1,3 +1,4 @@
import json
import requests import requests
# 获取云盘信息 # 获取云盘信息
@ -17,10 +18,13 @@ def get_cloud_info(cookie):
print(f"总大小: {convert_bytes(total_size)}") print(f"总大小: {convert_bytes(total_size)}")
print(f"最大容量: {convert_bytes(max_size)}") print(f"最大容量: {convert_bytes(max_size)}")
print(f"文件数量: {file_count}") print(f"文件数量: {file_count}")
return True
else: else:
print(f"获取云盘信息失败: {response_data.get('message')}") print(f"获取云盘信息失败: {response_data.get('message')}")
return False
except json.JSONDecodeError: except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text) print("响应内容无法解析为JSON:", response.text)
return False
# 字节数转换为可读格式GB, MB, TB等 # 字节数转换为可读格式GB, MB, TB等
def convert_bytes(size_in_bytes): def convert_bytes(size_in_bytes):

394
main.py
View File

@ -1,225 +1,211 @@
import asyncio
from functools import partial
import os import os
import json import json
from typing import Union from typing import Union
import aiohttp
import requests import requests
import time import time
# 导入 login 函数 from login import login
from login import login # 直接从 login.py 导入 login 函数 from get_cloud_info import get_cloud_info
from get_cloud_info import get_cloud_info # 从 get_cloud_info.py 导入 get_cloud_info 函数
# 获取当前时间戳(秒)
def get_current_timestamp():
return int(time.time())
# 读取 cookies.txt 文件 class NcmUploader:
def read_cookie():
if os.path.exists("cookies.txt"):
with open("cookies.txt", "r") as f:
cookie = f.read().strip()
if cookie:
return cookie
return None
# 读取歌曲.json 文件并返回数据 def __init__(self, cookie: str):
def read_songs_data(): self._cookie = cookie
if os.path.exists("歌曲.json"): self._show_cloud_music_detail()
with open("歌曲.json", "r", encoding="utf-8") as f:
try:
data = json.load(f)
return data.get('data', [])
except json.JSONDecodeError:
print("歌曲.json 格式错误")
return []
else:
print("歌曲.json 文件不存在")
return []
# 提取所有歌曲的 id 和其他信息 def _show_cloud_music_detail(self):
def get_all_song_info(songs_data): if not get_cloud_info(self._cookie):
song_info_list = [] raise ValueError("检查 cookie 有效性")
for song in songs_data:
song_info = {
'id': song.get("id"),
'size': song.get("size"),
'ext': song.get("ext"),
'bitrate': song.get("bitrate"),
'md5': song.get("md5")
}
song_info_list.append(song_info)
return song_info_list
# 查询歌曲详情 async def upload(self, music_json_path: str, concurrency: int = 10):
def get_song_details(song_ids): songs_data = self._read_songs_data(music_json_path)
ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串 song_info_list = self.get_all_song_info(songs_data)
timestamp = get_current_timestamp() # 获取当前时间戳 song_info_list = self.get_resume_song_info_list(song_info_list)
url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}" await self._async_upload_songs(song_info_list, concurrency)
print(f"查询歌曲详情 URL: {url}")
response = requests.get(url)
try:
response_data = response.json()
if response_data.get('code') == 200:
return response_data.get('songs', [])
else:
print("获取歌曲详情失败:", response_data.get("message"))
return []
except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text)
return []
# 判断歌曲是否已上传云盘 async def _async_upload_songs(self, song_info_list: list, concurrency: int):
def has_uploaded(song_id, cookie): sem = asyncio.Semaphore(concurrency) # 创建信号量
url = f"http://localhost:3000/user/cloud/detail?id={song_id}&cookie={cookie}" async def _upload_with_semaphore(song_info):
response = requests.get(url) async with sem: # 使用信号量控制并发
try: return await self._upload_one_song(song_info)
response_data = response.json()
if response_data.get('code') == 200 and len(response_data['data']) != 0:
return True
else:
print("获取云盘歌曲详情失败:", response_data.get("message"))
return False
except json.JSONDecodeError:
print("获取云盘歌曲信息失败响应内容无法解析为JSON:", response.text)
return False
# 执行 import 请求
def import_song(song_info, cookie):
song_id = song_info['id']
artist = song_info['artist']
album = song_info['album']
file_size = song_info['size']
bitrate = song_info['bitrate']
md5 = song_info['md5']
file_type = song_info['ext']
# 构造完整的请求URL和参数
timestamp = get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}"
#print(f"执行导入请求 URL: {url}")
response = requests.get(url)
try:
response_data = response.json()
return response_data
except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text)
return None
# 保存失败的 id 到文件
def save_failed_id(song_id):
with open("failed_ids.txt", "a") as f:
f.write(f"{song_id}\n")
# 获取最后一个上传的异常 id tasks = []
def get_last_failed_id() -> Union[int, None]: for song_info in song_info_list:
if not os.path.exists("failed_ids.txt"): tasks.append(_upload_with_semaphore(song_info)) # 直接创建协程任务
return None await asyncio.gather(*tasks)
with open("failed_ids.txt", "r") as f:
ids = [line.strip() for line in f]
return int(ids[-1])
# 处理歌曲导入请求 async def _upload_one_song(self, song_info: dict):
def process_songs(song_info_list, cookie): try:
failed_attempts = {} # 记录每个 ID 失败的次数 await self._try_to_upload_one_song(song_info)
for song_info in song_info_list: except Exception as e:
song_id = song_info['id'] print(f"上传过程异常,跳过该歌曲:{e}")
self._save_failed_id(song_info["id"])
async def _try_to_upload_one_song(self, song_info: dict):
song_id = int(song_info["id"])
print(f"正在导入歌曲ID: {song_id}") print(f"正在导入歌曲ID: {song_id}")
# 已上传则跳过 # 已上传则跳过
if has_uploaded(song_id, cookie): if await self._has_uploaded(song_id):
print('该歌曲已上传,跳过!') print("该歌曲已上传,跳过!")
continue
# 查询歌曲的详细信息
song_details = get_song_details([song_id])
if song_details:
song_name = song_details[0]['name']
song_artist = song_details[0]['ar'][0]['name']
song_album = song_details[0]['al']['name']
print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}")
# 更新 song_info 添加 artist 和 album 信息
song_info['artist'] = song_artist
song_info['album'] = song_album
try:
try_to_upload_song(song_info, cookie)
except Exception as e:
print(f'上传过程异常,跳过该歌曲:{e}')
save_failed_id(song_id)
def try_to_upload_song(song_info, cookie):
song_id = song_info['id']
attempts = 0
while attempts < 3:
result = import_song(song_info, cookie)
if result:
success_songs = result.get('data', {}).get('successSongs', [])
failed = result.get('data', {}).get('failed', [])
if success_songs:
print(f"歌曲 {song_id} 导入成功!")
break # 成功则跳出循环
else:
print(f"歌曲 {song_id} 导入失败,失败原因:{failed}")
if all(f['code'] == -100 for f in failed): # 文件已存在的错误码
print(f"歌曲 {song_id} 文件已存在,跳过")
save_failed_id(song_id) # 保存失败的 ID
break
time.sleep(5) # 请求失败后等待 5 秒重新请求
attempts += 1
if attempts == 3: # 如果失败三次,则跳过此 ID
print(f"歌曲 {song_id} 失败三次,跳过该歌曲。")
save_failed_id(song_id) # 保存失败的 ID
def get_resume_song_info_list(song_info_list) -> list:
last_failed_id = get_last_failed_id()
if last_failed_id is None:
print("暂无上传失败记录,从头开始上传")
return song_info_list
for index, song_info in enumerate(song_info_list):
if int(song_info['id']) == last_failed_id:
print(f"当前已上传: {index + 1},最后上传失败的 id: {song_info['id']}")
return song_info_list[index + 1:]
print('暂未匹配到最后一次失败的 song_id, 将从头开始上传')
return song_info_list
# 主函数
def main():
# 尝试读取已保存的 cookie
cookie = read_cookie()
if cookie:
#print(f"读取到已保存的 cookie: {cookie}")
# 获取并显示云盘信息
get_cloud_info(cookie)
else:
print("没有找到 cookie正在执行登录...")
# 执行 login 函数获取新 cookie
cookie = login()
if cookie:
print(f"登录成功cookie: {cookie}")
# 获取并显示云盘信息
get_cloud_info(cookie)
else:
print("登录失败")
return return
# 读取歌曲数据 # 查询歌曲的详细信息
songs_data = read_songs_data() song_details = self.get_song_details([song_id])
if song_details:
if songs_data: song_name = song_details[0]["name"]
#print(f"共找到 {len(songs_data)} 首歌曲,提取歌曲 ID 和其他信息...") song_artist = song_details[0]["ar"][0]["name"]
song_info_list = get_all_song_info(songs_data) song_album = song_details[0]["al"]["name"]
#print(f"所有歌曲信息: {song_info_list}") print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}")
# 更新 song_info 添加 artist 和 album 信息
# 从上次失败的歌曲开始上传 song_info["artist"] = song_artist
song_info_list = get_resume_song_info_list(song_info_list) song_info["album"] = song_album
await self._send_upload_request(song_info)
# 执行歌曲导入请求
process_songs(song_info_list, cookie) # 获取当前时间戳(秒)
else: @staticmethod
print("没有找到任何歌曲数据") def get_current_timestamp():
return int(time.time())
# 读取 cookies.txt 文件
@staticmethod
def read_cookie():
if os.path.exists("cookies.txt"):
with open("cookies.txt", "r") as f:
cookie = f.read().strip()
if cookie:
return cookie
return None
# 读取歌曲.json 文件并返回数据
@staticmethod
def _read_songs_data(music_json_path: str) -> list:
with open(music_json_path, "r", encoding="utf-8") as f:
try:
data = json.load(f)
return data.get("data", [])
except json.JSONDecodeError:
print("歌曲.json 格式错误")
return []
# 提取所有歌曲的 id 和其他信息
@staticmethod
def get_all_song_info(songs_data):
song_info_list = []
for song in songs_data:
song_info = {
"id": song.get("id"),
"size": song.get("size"),
"ext": song.get("ext"),
"bitrate": song.get("bitrate"),
"md5": song.get("md5"),
}
song_info_list.append(song_info)
return song_info_list
# 查询歌曲详情
@staticmethod
def get_song_details(song_ids: list):
ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串
timestamp = NcmUploader.get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}"
print(f"查询歌曲详情 URL: {url}")
response = requests.get(url)
try:
response_data = response.json()
if response_data.get("code") == 200:
return response_data.get("songs", [])
else:
print("获取歌曲详情失败:", response_data.get("message"))
return []
except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text)
return []
# 判断歌曲是否已上传云盘
async def _has_uploaded(self, song_id: int) -> bool:
url = (
f"http://localhost:3000/user/cloud/detail?id={song_id}&cookie={self._cookie}"
)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
try:
response_data = await response.json()
if response_data.get("code") == 200 and len(response_data["data"]) != 0:
return True
else:
print("获取云盘歌曲详情失败:", response_data.get("message"))
return False
except json.JSONDecodeError:
print("获取云盘歌曲信息失败响应内容无法解析为JSON:", response.text)
return False
async def _send_upload_request(self, song_info: dict):
song_id = song_info["id"]
artist = song_info["artist"]
album = song_info["album"]
file_size = song_info["size"]
bitrate = song_info["bitrate"]
md5 = song_info["md5"]
file_type = song_info["ext"]
# 构造完整的请求URL和参数
timestamp = NcmUploader.get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={self._cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}"
# print(f"执行导入请求 URL: {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response_data = await response.json()
success_songs = response_data.get("data", {}).get("successSongs", [])
failed = response_data.get("data", {}).get("failed", [])
if success_songs:
print(f"歌曲 {song_id} 导入成功!")
else:
print(f"歌曲 {song_id} 导入失败,失败原因:{failed}")
if all(f["code"] == -100 for f in failed): # 文件已存在的错误码
print(f"歌曲 {song_id} 文件已存在,跳过")
self._save_failed_id(song_id) # 保存失败的 ID
# 保存失败的 id 到文件
@staticmethod
def _save_failed_id(song_id):
with open("failed_ids.txt", "a") as f:
f.write(f"{song_id}\n")
# 获取最后一个上传的异常 id
@staticmethod
def _get_last_failed_id() -> Union[int, None]:
if not os.path.exists("failed_ids.txt"):
return None
with open("failed_ids.txt", "r") as f:
ids = [line.strip() for line in f]
return int(ids[-1])
@staticmethod
def get_resume_song_info_list(song_info_list) -> list:
last_failed_id = NcmUploader._get_last_failed_id()
if last_failed_id is None:
print("暂无上传失败记录,从头开始上传")
return song_info_list
for index, song_info in enumerate(song_info_list):
if int(song_info["id"]) == last_failed_id:
print(f"当前已上传: {index + 1},最后上传失败的 id: {song_info['id']}")
return song_info_list[index + 1 :]
print("暂未匹配到最后一次失败的 song_id, 将从头开始上传")
return song_info_list
if __name__ == "__main__": if __name__ == "__main__":
main() if not os.path.exists("cookies.txt"):
print('未发现 cookies.txt 文件')
exit(0)
with open("cookies.txt", "r") as f:
cookie = f.read().strip()
ncmUploader = NcmUploader(cookie)
asyncio.run(ncmUploader.upload('歌曲.json', 10))