Compare commits

..

No commits in common. "720bdfcf6634fa4c497b5cec1c0c789ce0c40517" and "5a7bcc49f29234555fb0a762c68ef666c97818a0" have entirely different histories.

3 changed files with 152 additions and 199 deletions

6
.gitignore vendored
View File

@ -1,6 +0,0 @@
__pycache__/
.history/
neteasecloudmusicapi/
*.txt
*.json
test.py

View File

@ -1,4 +1,3 @@
import json
import requests import requests
# 获取云盘信息 # 获取云盘信息
@ -18,13 +17,10 @@ def get_cloud_info(cookie):
print(f"总大小: {convert_bytes(total_size)}") print(f"总大小: {convert_bytes(total_size)}")
print(f"最大容量: {convert_bytes(max_size)}") print(f"最大容量: {convert_bytes(max_size)}")
print(f"文件数量: {file_count}") print(f"文件数量: {file_count}")
return True
else: else:
print(f"获取云盘信息失败: {response_data.get('message')}") print(f"获取云盘信息失败: {response_data.get('message')}")
return False
except json.JSONDecodeError: except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text) print("响应内容无法解析为JSON:", response.text)
return False
# 字节数转换为可读格式GB, MB, TB等 # 字节数转换为可读格式GB, MB, TB等
def convert_bytes(size_in_bytes): def convert_bytes(size_in_bytes):

261
main.py
View File

@ -1,78 +1,17 @@
import asyncio
from functools import partial
import os import os
import json import json
from typing import Union
import aiohttp
import requests import requests
import time import time
from login import login # 导入 login 函数
from get_cloud_info import get_cloud_info from login import login # 直接从 login.py 导入 login 函数
from get_cloud_info import get_cloud_info # 从 get_cloud_info.py 导入 get_cloud_info 函数
class NcmUploader:
def __init__(self, cookie: str):
self._cookie = cookie
self._show_cloud_music_detail()
def _show_cloud_music_detail(self):
if not get_cloud_info(self._cookie):
raise ValueError("检查 cookie 有效性")
async def upload(self, music_json_path: str, concurrency: int = 10):
songs_data = self._read_songs_data(music_json_path)
song_info_list = self.get_all_song_info(songs_data)
song_info_list = self.get_resume_song_info_list(song_info_list)
await self._async_upload_songs(song_info_list, concurrency)
async def _async_upload_songs(self, song_info_list: list, concurrency: int):
sem = asyncio.Semaphore(concurrency) # 创建信号量
async def _upload_with_semaphore(song_info):
async with sem: # 使用信号量控制并发
return await self._upload_one_song(song_info)
tasks = []
for song_info in song_info_list:
tasks.append(_upload_with_semaphore(song_info)) # 直接创建协程任务
await asyncio.gather(*tasks)
async def _upload_one_song(self, song_info: dict):
try:
await self._try_to_upload_one_song(song_info)
except Exception as e:
print(f"上传过程异常,跳过该歌曲:{e}")
self._save_failed_id(song_info["id"])
async def _try_to_upload_one_song(self, song_info: dict):
song_id = int(song_info["id"])
print(f"正在导入歌曲ID: {song_id}")
# 已上传则跳过
if await self._has_uploaded(song_id):
print("该歌曲已上传,跳过!")
return
# 查询歌曲的详细信息
song_details = self.get_song_details([song_id])
if song_details:
song_name = song_details[0]["name"]
song_artist = song_details[0]["ar"][0]["name"]
song_album = song_details[0]["al"]["name"]
print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}")
# 更新 song_info 添加 artist 和 album 信息
song_info["artist"] = song_artist
song_info["album"] = song_album
await self._send_upload_request(song_info)
# 获取当前时间戳(秒) # 获取当前时间戳(秒)
@staticmethod
def get_current_timestamp(): def get_current_timestamp():
return int(time.time()) return int(time.time())
# 读取 cookies.txt 文件 # 读取 cookies.txt 文件
@staticmethod
def read_cookie(): def read_cookie():
if os.path.exists("cookies.txt"): if os.path.exists("cookies.txt"):
with open("cookies.txt", "r") as f: with open("cookies.txt", "r") as f:
@ -82,43 +21,44 @@ class NcmUploader:
return None return None
# 读取歌曲.json 文件并返回数据 # 读取歌曲.json 文件并返回数据
@staticmethod def read_songs_data():
def _read_songs_data(music_json_path: str) -> list: if os.path.exists("歌曲.json"):
with open(music_json_path, "r", encoding="utf-8") as f: with open("歌曲.json", "r", encoding="utf-8") as f:
try: try:
data = json.load(f) data = json.load(f)
return data.get("data", []) return data.get('data', [])
except json.JSONDecodeError: except json.JSONDecodeError:
print("歌曲.json 格式错误") print("歌曲.json 格式错误")
return [] return []
else:
print("歌曲.json 文件不存在")
return []
# 提取所有歌曲的 id 和其他信息 # 提取所有歌曲的 id 和其他信息
@staticmethod
def get_all_song_info(songs_data): def get_all_song_info(songs_data):
song_info_list = [] song_info_list = []
for song in songs_data: for song in songs_data:
song_info = { song_info = {
"id": song.get("id"), 'id': song.get("id"),
"size": song.get("size"), 'size': song.get("size"),
"ext": song.get("ext"), 'ext': song.get("ext"),
"bitrate": song.get("bitrate"), 'bitrate': song.get("bitrate"),
"md5": song.get("md5"), 'md5': song.get("md5")
} }
song_info_list.append(song_info) song_info_list.append(song_info)
return song_info_list return song_info_list
# 查询歌曲详情 # 查询歌曲详情
@staticmethod def get_song_details(song_ids):
def get_song_details(song_ids: list):
ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串 ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串
timestamp = NcmUploader.get_current_timestamp() # 获取当前时间戳 timestamp = get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}" url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}"
print(f"查询歌曲详情 URL: {url}") print(f"查询歌曲详情 URL: {url}")
response = requests.get(url) response = requests.get(url)
try: try:
response_data = response.json() response_data = response.json()
if response_data.get("code") == 200: if response_data.get('code') == 200:
return response_data.get("songs", []) return response_data.get('songs', [])
else: else:
print("获取歌曲详情失败:", response_data.get("message")) print("获取歌曲详情失败:", response_data.get("message"))
return [] return []
@ -126,86 +66,109 @@ class NcmUploader:
print("响应内容无法解析为JSON:", response.text) print("响应内容无法解析为JSON:", response.text)
return [] return []
# 判断歌曲是否已上传云盘 # 执行 import 请求
async def _has_uploaded(self, song_id: int) -> bool: def import_song(song_info, cookie):
url = ( song_id = song_info['id']
f"http://localhost:3000/user/cloud/detail?id={song_id}&cookie={self._cookie}" artist = song_info['artist']
) album = song_info['album']
async with aiohttp.ClientSession() as session: file_size = song_info['size']
async with session.get(url) as response: bitrate = song_info['bitrate']
try: md5 = song_info['md5']
response_data = await response.json() file_type = song_info['ext']
if response_data.get("code") == 200 and len(response_data["data"]) != 0:
return True
else:
print("获取云盘歌曲详情失败:", response_data.get("message"))
return False
except json.JSONDecodeError:
print("获取云盘歌曲信息失败响应内容无法解析为JSON:", response.text)
return False
async def _send_upload_request(self, song_info: dict):
song_id = song_info["id"]
artist = song_info["artist"]
album = song_info["album"]
file_size = song_info["size"]
bitrate = song_info["bitrate"]
md5 = song_info["md5"]
file_type = song_info["ext"]
# 构造完整的请求URL和参数 # 构造完整的请求URL和参数
timestamp = NcmUploader.get_current_timestamp() # 获取当前时间戳 timestamp = get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={self._cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}" url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}"
#print(f"执行导入请求 URL: {url}") #print(f"执行导入请求 URL: {url}")
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response_data = await response.json()
success_songs = response_data.get("data", {}).get("successSongs", []) response = requests.get(url)
failed = response_data.get("data", {}).get("failed", []) try:
response_data = response.json()
if success_songs: return response_data
print(f"歌曲 {song_id} 导入成功!") except json.JSONDecodeError:
else: print("响应内容无法解析为JSON:", response.text)
print(f"歌曲 {song_id} 导入失败,失败原因:{failed}") return None
if all(f["code"] == -100 for f in failed): # 文件已存在的错误码
print(f"歌曲 {song_id} 文件已存在,跳过")
self._save_failed_id(song_id) # 保存失败的 ID
# 保存失败的 id 到文件 # 保存失败的 id 到文件
@staticmethod def save_failed_id(song_id):
def _save_failed_id(song_id):
with open("failed_ids.txt", "a") as f: with open("failed_ids.txt", "a") as f:
f.write(f"{song_id}\n") f.write(f"{song_id}\n")
# 获取最后一个上传的异常 id # 处理歌曲导入请求
@staticmethod def process_songs(song_info_list, cookie):
def _get_last_failed_id() -> Union[int, None]: failed_attempts = {} # 记录每个 ID 失败的次数
if not os.path.exists("failed_ids.txt"): for song_info in song_info_list:
return None song_id = song_info['id']
with open("failed_ids.txt", "r") as f: print(f"正在导入歌曲ID: {song_id}")
ids = [line.strip() for line in f]
return int(ids[-1])
@staticmethod # 查询歌曲的详细信息
def get_resume_song_info_list(song_info_list) -> list: song_details = get_song_details([song_id])
last_failed_id = NcmUploader._get_last_failed_id() if song_details:
if last_failed_id is None: song_name = song_details[0]['name']
print("暂无上传失败记录,从头开始上传") song_artist = song_details[0]['ar'][0]['name']
return song_info_list song_album = song_details[0]['al']['name']
for index, song_info in enumerate(song_info_list): print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}")
if int(song_info["id"]) == last_failed_id: # 更新 song_info 添加 artist 和 album 信息
print(f"当前已上传: {index + 1},最后上传失败的 id: {song_info['id']}") song_info['artist'] = song_artist
return song_info_list[index + 1 :] song_info['album'] = song_album
print("暂未匹配到最后一次失败的 song_id, 将从头开始上传")
return song_info_list
attempts = 0
while attempts < 3:
result = import_song(song_info, cookie)
if result:
success_songs = result.get('data', {}).get('successSongs', [])
failed = result.get('data', {}).get('failed', [])
if success_songs:
print(f"歌曲 {song_id} 导入成功!")
break # 成功则跳出循环
else:
print(f"歌曲 {song_id} 导入失败,失败原因:{failed}")
if all(f['code'] == -100 for f in failed): # 文件已存在的错误码
print(f"歌曲 {song_id} 文件已存在,跳过")
save_failed_id(song_id) # 保存失败的 ID
break
time.sleep(5) # 请求失败后等待 5 秒重新请求
attempts += 1
if attempts == 3: # 如果失败三次,则跳过此 ID
print(f"歌曲 {song_id} 失败三次,跳过该歌曲。")
save_failed_id(song_id) # 保存失败的 ID
# 主函数
def main():
# 尝试读取已保存的 cookie
cookie = read_cookie()
if cookie:
#print(f"读取到已保存的 cookie: {cookie}")
# 获取并显示云盘信息
get_cloud_info(cookie)
else:
print("没有找到 cookie正在执行登录...")
# 执行 login 函数获取新 cookie
cookie = login()
if cookie:
print(f"登录成功cookie: {cookie}")
# 获取并显示云盘信息
get_cloud_info(cookie)
else:
print("登录失败")
return
# 读取歌曲数据
songs_data = read_songs_data()
if songs_data:
#print(f"共找到 {len(songs_data)} 首歌曲,提取歌曲 ID 和其他信息...")
song_info_list = get_all_song_info(songs_data)
#print(f"所有歌曲信息: {song_info_list}")
# 执行歌曲导入请求
process_songs(song_info_list, cookie)
else:
print("没有找到任何歌曲数据")
if __name__ == "__main__": if __name__ == "__main__":
if not os.path.exists("cookies.txt"): main()
print('未发现 cookies.txt 文件')
exit(0)
with open("cookies.txt", "r") as f:
cookie = f.read().strip()
ncmUploader = NcmUploader(cookie)
asyncio.run(ncmUploader.upload('歌曲.json', 10))