This commit is contained in:
holwell 2025-02-14 16:39:41 +08:00 committed by GitHub
commit 95c232d72e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 77 additions and 22 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
__pycache__/
.history/
neteasecloudmusicapi/
cookies.txt
failed_ids.txt
*.json

49
main.py
View File

@ -1,5 +1,6 @@
import os import os
import json import json
from typing import Union
import requests import requests
import time import time
@ -66,6 +67,21 @@ def get_song_details(song_ids):
print("响应内容无法解析为JSON:", response.text) print("响应内容无法解析为JSON:", response.text)
return [] return []
# 判断歌曲是否已上传云盘
def has_uploaded(song_id, cookie):
url = f"http://localhost:3000/user/cloud/detail?id={song_id}&cookie={cookie}"
response = requests.get(url)
try:
response_data = response.json()
if response_data.get('code') == 200 and len(response_data['data']) != 0:
return True
else:
print("获取云盘歌曲详情失败:", response_data.get("message"))
return False
except json.JSONDecodeError:
print("获取云盘歌曲信息失败响应内容无法解析为JSON:", response.text)
return False
# 执行 import 请求 # 执行 import 请求
def import_song(song_info, cookie): def import_song(song_info, cookie):
song_id = song_info['id'] song_id = song_info['id']
@ -94,6 +110,14 @@ def save_failed_id(song_id):
with open("failed_ids.txt", "a") as f: with open("failed_ids.txt", "a") as f:
f.write(f"{song_id}\n") f.write(f"{song_id}\n")
# 获取最后一个上传的异常 id
def get_last_failed_id() -> Union[int, None]:
if not os.path.exists("failed_ids.txt"):
return None
with open("failed_ids.txt", "r") as f:
ids = [line.strip() for line in f]
return int(ids[-1])
# 处理歌曲导入请求 # 处理歌曲导入请求
def process_songs(song_info_list, cookie): def process_songs(song_info_list, cookie):
failed_attempts = {} # 记录每个 ID 失败的次数 failed_attempts = {} # 记录每个 ID 失败的次数
@ -101,6 +125,11 @@ def process_songs(song_info_list, cookie):
song_id = song_info['id'] song_id = song_info['id']
print(f"正在导入歌曲ID: {song_id}") print(f"正在导入歌曲ID: {song_id}")
# 已上传则跳过
if has_uploaded(song_id, cookie):
print('该歌曲已上传,跳过!')
continue
# 查询歌曲的详细信息 # 查询歌曲的详细信息
song_details = get_song_details([song_id]) song_details = get_song_details([song_id])
if song_details: if song_details:
@ -111,7 +140,14 @@ def process_songs(song_info_list, cookie):
# 更新 song_info 添加 artist 和 album 信息 # 更新 song_info 添加 artist 和 album 信息
song_info['artist'] = song_artist song_info['artist'] = song_artist
song_info['album'] = song_album song_info['album'] = song_album
try:
try_to_upload_song(song_info, cookie)
except Exception as e:
print(f'上传过程异常,跳过该歌曲:{e}')
save_failed_id(song_id)
def try_to_upload_song(song_info, cookie):
song_id = song_info['id']
attempts = 0 attempts = 0
while attempts < 3: while attempts < 3:
result = import_song(song_info, cookie) result = import_song(song_info, cookie)
@ -136,6 +172,16 @@ def process_songs(song_info_list, cookie):
print(f"歌曲 {song_id} 失败三次,跳过该歌曲。") print(f"歌曲 {song_id} 失败三次,跳过该歌曲。")
save_failed_id(song_id) # 保存失败的 ID save_failed_id(song_id) # 保存失败的 ID
def get_resume_song_info_list(song_info_list) -> list:
last_failed_id = get_last_failed_id()
if last_failed_id is None:
print("暂无上传失败记录,从头开始上传")
return
for index, song_info in enumerate(song_info_list):
if int(song_info['id']) == last_failed_id:
print(f"当前已上传: {index + 1},最后上传失败的 id: {song_info['id']}")
return song_info_list[index + 1:]
# 主函数 # 主函数
def main(): def main():
# 尝试读取已保存的 cookie # 尝试读取已保存的 cookie
@ -165,6 +211,9 @@ def main():
song_info_list = get_all_song_info(songs_data) song_info_list = get_all_song_info(songs_data)
#print(f"所有歌曲信息: {song_info_list}") #print(f"所有歌曲信息: {song_info_list}")
# 从上次失败的歌曲开始上传
song_info_list = get_resume_song_info_list(song_info_list)
# 执行歌曲导入请求 # 执行歌曲导入请求
process_songs(song_info_list, cookie) process_songs(song_info_list, cookie)
else: else: