diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1d67844 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +.history/ +neteasecloudmusicapi/ +*.txt +*.json +test.py \ No newline at end of file diff --git a/get_cloud_info.py b/get_cloud_info.py index ffbcc4f..54e4df4 100644 --- a/get_cloud_info.py +++ b/get_cloud_info.py @@ -1,3 +1,4 @@ +import json import requests # 获取云盘信息 @@ -17,10 +18,13 @@ def get_cloud_info(cookie): print(f"总大小: {convert_bytes(total_size)}") print(f"最大容量: {convert_bytes(max_size)}") print(f"文件数量: {file_count}") + return True else: print(f"获取云盘信息失败: {response_data.get('message')}") + return False except json.JSONDecodeError: print("响应内容无法解析为JSON:", response.text) + return False # 字节数转换为可读格式(GB, MB, TB等) def convert_bytes(size_in_bytes): diff --git a/main.py b/main.py index 74de983..d30e0da 100644 --- a/main.py +++ b/main.py @@ -1,174 +1,211 @@ +import asyncio +from functools import partial import os import json +from typing import Union +import aiohttp import requests import time -# 导入 login 函数 -from login import login # 直接从 login.py 导入 login 函数 -from get_cloud_info import get_cloud_info # 从 get_cloud_info.py 导入 get_cloud_info 函数 +from login import login +from get_cloud_info import get_cloud_info -# 获取当前时间戳(秒) -def get_current_timestamp(): - return int(time.time()) -# 读取 cookies.txt 文件 -def read_cookie(): - if os.path.exists("cookies.txt"): - with open("cookies.txt", "r") as f: - cookie = f.read().strip() - if cookie: - return cookie - return None +class NcmUploader: -# 读取歌曲.json 文件并返回数据 -def read_songs_data(): - if os.path.exists("歌曲.json"): - with open("歌曲.json", "r", encoding="utf-8") as f: + def __init__(self, cookie: str): + self._cookie = cookie + self._show_cloud_music_detail() + + def _show_cloud_music_detail(self): + if not get_cloud_info(self._cookie): + raise ValueError("检查 cookie 有效性") + + async def upload(self, music_json_path: str, concurrency: int = 10): + songs_data = self._read_songs_data(music_json_path) + song_info_list = self.get_all_song_info(songs_data) + song_info_list = self.get_resume_song_info_list(song_info_list) + await self._async_upload_songs(song_info_list, concurrency) + + async def _async_upload_songs(self, song_info_list: list, concurrency: int): + sem = asyncio.Semaphore(concurrency) # 创建信号量 + async def _upload_with_semaphore(song_info): + async with sem: # 使用信号量控制并发 + return await self._upload_one_song(song_info) + + tasks = [] + for song_info in song_info_list: + tasks.append(_upload_with_semaphore(song_info)) # 直接创建协程任务 + await asyncio.gather(*tasks) + + async def _upload_one_song(self, song_info: dict): + try: + await self._try_to_upload_one_song(song_info) + except Exception as e: + print(f"上传过程异常,跳过该歌曲:{e}") + self._save_failed_id(song_info["id"]) + + async def _try_to_upload_one_song(self, song_info: dict): + song_id = int(song_info["id"]) + print(f"正在导入歌曲ID: {song_id}") + + # 已上传则跳过 + if await self._has_uploaded(song_id): + print("该歌曲已上传,跳过!") + return + + # 查询歌曲的详细信息 + song_details = self.get_song_details([song_id]) + if song_details: + song_name = song_details[0]["name"] + song_artist = song_details[0]["ar"][0]["name"] + song_album = song_details[0]["al"]["name"] + print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}") + # 更新 song_info 添加 artist 和 album 信息 + song_info["artist"] = song_artist + song_info["album"] = song_album + await self._send_upload_request(song_info) + + # 获取当前时间戳(秒) + @staticmethod + def get_current_timestamp(): + return int(time.time()) + + # 读取 cookies.txt 文件 + @staticmethod + def read_cookie(): + if os.path.exists("cookies.txt"): + with open("cookies.txt", "r") as f: + cookie = f.read().strip() + if cookie: + return cookie + return None + + # 读取歌曲.json 文件并返回数据 + @staticmethod + def _read_songs_data(music_json_path: str) -> list: + with open(music_json_path, "r", encoding="utf-8") as f: try: data = json.load(f) - return data.get('data', []) + return data.get("data", []) except json.JSONDecodeError: print("歌曲.json 格式错误") return [] - else: - print("歌曲.json 文件不存在") - return [] -# 提取所有歌曲的 id 和其他信息 -def get_all_song_info(songs_data): - song_info_list = [] - for song in songs_data: - song_info = { - 'id': song.get("id"), - 'size': song.get("size"), - 'ext': song.get("ext"), - 'bitrate': song.get("bitrate"), - 'md5': song.get("md5") - } - song_info_list.append(song_info) - return song_info_list + # 提取所有歌曲的 id 和其他信息 + @staticmethod + def get_all_song_info(songs_data): + song_info_list = [] + for song in songs_data: + song_info = { + "id": song.get("id"), + "size": song.get("size"), + "ext": song.get("ext"), + "bitrate": song.get("bitrate"), + "md5": song.get("md5"), + } + song_info_list.append(song_info) + return song_info_list -# 查询歌曲详情 -def get_song_details(song_ids): - ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串 - timestamp = get_current_timestamp() # 获取当前时间戳 - url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}" - print(f"查询歌曲详情 URL: {url}") - response = requests.get(url) - try: - response_data = response.json() - if response_data.get('code') == 200: - return response_data.get('songs', []) - else: - print("获取歌曲详情失败:", response_data.get("message")) + # 查询歌曲详情 + @staticmethod + def get_song_details(song_ids: list): + ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串 + timestamp = NcmUploader.get_current_timestamp() # 获取当前时间戳 + url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}" + print(f"查询歌曲详情 URL: {url}") + response = requests.get(url) + try: + response_data = response.json() + if response_data.get("code") == 200: + return response_data.get("songs", []) + else: + print("获取歌曲详情失败:", response_data.get("message")) + return [] + except json.JSONDecodeError: + print("响应内容无法解析为JSON:", response.text) return [] - except json.JSONDecodeError: - print("响应内容无法解析为JSON:", response.text) - return [] -# 执行 import 请求 -def import_song(song_info, cookie): - song_id = song_info['id'] - artist = song_info['artist'] - album = song_info['album'] - file_size = song_info['size'] - bitrate = song_info['bitrate'] - md5 = song_info['md5'] - file_type = song_info['ext'] - - # 构造完整的请求URL和参数 - timestamp = get_current_timestamp() # 获取当前时间戳 - url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}" - #print(f"执行导入请求 URL: {url}") - - response = requests.get(url) - try: - response_data = response.json() - return response_data - except json.JSONDecodeError: - print("响应内容无法解析为JSON:", response.text) - return None + # 判断歌曲是否已上传云盘 + async def _has_uploaded(self, song_id: int) -> bool: + url = ( + f"http://localhost:3000/user/cloud/detail?id={song_id}&cookie={self._cookie}" + ) + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + try: + response_data = await response.json() + if response_data.get("code") == 200 and len(response_data["data"]) != 0: + return True + else: + print("获取云盘歌曲详情失败:", response_data.get("message")) + return False + except json.JSONDecodeError: + print("获取云盘歌曲信息失败,响应内容无法解析为JSON:", response.text) + return False -# 保存失败的 id 到文件 -def save_failed_id(song_id): - with open("failed_ids.txt", "a") as f: - f.write(f"{song_id}\n") + async def _send_upload_request(self, song_info: dict): + song_id = song_info["id"] + artist = song_info["artist"] + album = song_info["album"] + file_size = song_info["size"] + bitrate = song_info["bitrate"] + md5 = song_info["md5"] + file_type = song_info["ext"] -# 处理歌曲导入请求 -def process_songs(song_info_list, cookie): - failed_attempts = {} # 记录每个 ID 失败的次数 - for song_info in song_info_list: - song_id = song_info['id'] - print(f"正在导入歌曲ID: {song_id}") - - # 查询歌曲的详细信息 - song_details = get_song_details([song_id]) - if song_details: - song_name = song_details[0]['name'] - song_artist = song_details[0]['ar'][0]['name'] - song_album = song_details[0]['al']['name'] - print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}") - # 更新 song_info 添加 artist 和 album 信息 - song_info['artist'] = song_artist - song_info['album'] = song_album - - attempts = 0 - while attempts < 3: - result = import_song(song_info, cookie) - - if result: - success_songs = result.get('data', {}).get('successSongs', []) - failed = result.get('data', {}).get('failed', []) - - if success_songs: - print(f"歌曲 {song_id} 导入成功!") - break # 成功则跳出循环 - else: - print(f"歌曲 {song_id} 导入失败,失败原因:{failed}") - if all(f['code'] == -100 for f in failed): # 文件已存在的错误码 - print(f"歌曲 {song_id} 文件已存在,跳过") - save_failed_id(song_id) # 保存失败的 ID - break - time.sleep(5) # 请求失败后等待 5 秒重新请求 - attempts += 1 - - if attempts == 3: # 如果失败三次,则跳过此 ID - print(f"歌曲 {song_id} 失败三次,跳过该歌曲。") - save_failed_id(song_id) # 保存失败的 ID + # 构造完整的请求URL和参数 + timestamp = NcmUploader.get_current_timestamp() # 获取当前时间戳 + url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={self._cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}" + # print(f"执行导入请求 URL: {url}") + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + response_data = await response.json() -# 主函数 -def main(): - # 尝试读取已保存的 cookie - cookie = read_cookie() - - if cookie: - #print(f"读取到已保存的 cookie: {cookie}") - # 获取并显示云盘信息 - get_cloud_info(cookie) - else: - print("没有找到 cookie,正在执行登录...") - # 执行 login 函数获取新 cookie - cookie = login() - if cookie: - print(f"登录成功,cookie: {cookie}") - # 获取并显示云盘信息 - get_cloud_info(cookie) + success_songs = response_data.get("data", {}).get("successSongs", []) + failed = response_data.get("data", {}).get("failed", []) + + if success_songs: + print(f"歌曲 {song_id} 导入成功!") else: - print("登录失败") - return + print(f"歌曲 {song_id} 导入失败,失败原因:{failed}") + if all(f["code"] == -100 for f in failed): # 文件已存在的错误码 + print(f"歌曲 {song_id} 文件已存在,跳过") + self._save_failed_id(song_id) # 保存失败的 ID + + # 保存失败的 id 到文件 + @staticmethod + def _save_failed_id(song_id): + with open("failed_ids.txt", "a") as f: + f.write(f"{song_id}\n") + + # 获取最后一个上传的异常 id + @staticmethod + def _get_last_failed_id() -> Union[int, None]: + if not os.path.exists("failed_ids.txt"): + return None + with open("failed_ids.txt", "r") as f: + ids = [line.strip() for line in f] + return int(ids[-1]) + + @staticmethod + def get_resume_song_info_list(song_info_list) -> list: + last_failed_id = NcmUploader._get_last_failed_id() + if last_failed_id is None: + print("暂无上传失败记录,从头开始上传") + return song_info_list + for index, song_info in enumerate(song_info_list): + if int(song_info["id"]) == last_failed_id: + print(f"当前已上传: {index + 1},最后上传失败的 id: {song_info['id']}") + return song_info_list[index + 1 :] + print("暂未匹配到最后一次失败的 song_id, 将从头开始上传") + return song_info_list - # 读取歌曲数据 - songs_data = read_songs_data() - - if songs_data: - #print(f"共找到 {len(songs_data)} 首歌曲,提取歌曲 ID 和其他信息...") - song_info_list = get_all_song_info(songs_data) - #print(f"所有歌曲信息: {song_info_list}") - - # 执行歌曲导入请求 - process_songs(song_info_list, cookie) - else: - print("没有找到任何歌曲数据") if __name__ == "__main__": - main() + if not os.path.exists("cookies.txt"): + print('未发现 cookies.txt 文件') + exit(0) + with open("cookies.txt", "r") as f: + cookie = f.read().strip() + ncmUploader = NcmUploader(cookie) + asyncio.run(ncmUploader.upload('歌曲.json', 10))