From 8543f8413723c804dcffb4d71c7563f493223dea Mon Sep 17 00:00:00 2001 From: luqiao <50013416+BsaLee@users.noreply.github.com> Date: Fri, 14 Feb 2025 04:44:55 +0800 Subject: [PATCH] Add files via upload --- get_cloud_info.py | 37 ++++++++++ login.py | 81 +++++++++++++++++++++ main.py | 174 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 292 insertions(+) create mode 100644 get_cloud_info.py create mode 100644 login.py create mode 100644 main.py diff --git a/get_cloud_info.py b/get_cloud_info.py new file mode 100644 index 0000000..ffbcc4f --- /dev/null +++ b/get_cloud_info.py @@ -0,0 +1,37 @@ +import requests + +# 获取云盘信息 +def get_cloud_info(cookie): + url = "http://localhost:3000/user/cloud" + params = {"cookie": cookie, "limit": 1} + + response = requests.get(url, params=params) + try: + response_data = response.json() + if response_data.get('code') == 200: + total_size = response_data['size'] + max_size = response_data['maxSize'] + file_count = response_data['count'] + # 输出云盘信息 + print("云盘信息:") + print(f"总大小: {convert_bytes(total_size)}") + print(f"最大容量: {convert_bytes(max_size)}") + print(f"文件数量: {file_count}") + else: + print(f"获取云盘信息失败: {response_data.get('message')}") + except json.JSONDecodeError: + print("响应内容无法解析为JSON:", response.text) + +# 字节数转换为可读格式(GB, MB, TB等) +def convert_bytes(size_in_bytes): + try: + size_in_bytes = float(size_in_bytes) # 确保传入的值是一个数字 + except ValueError: + return "无效的字节数" # 如果无法转换为数字,则返回错误信息 + + units = ['B', 'KB', 'MB', 'GB', 'TB'] + for unit in units: + if size_in_bytes < 1024.0: + return f"{size_in_bytes:.2f} {unit}" + size_in_bytes /= 1024.0 + return f"{size_in_bytes:.2f} {unit}" diff --git a/login.py b/login.py new file mode 100644 index 0000000..526d1de --- /dev/null +++ b/login.py @@ -0,0 +1,81 @@ +import requests +from PIL import Image +from io import BytesIO +import base64 +import time + +# 获取当前时间戳 +def get_current_timestamp(): + return int(time.time()) + +# 获取unikey +def get_unikey(): + url = f"http://localhost:3000/login/qr/key?time={get_current_timestamp()}" + response = requests.get(url) + data = response.json() + if data['code'] == 200: + return data['data']['unikey'] + return None + +# 创建二维码 +def create_qr(unikey): + url = f"http://localhost:3000/login/qr/create?key={unikey}&qrimg=1&time={get_current_timestamp()}" + response = requests.get(url) + data = response.json() + if data['code'] == 200: + qrimg_base64 = data['data']['qrimg'] + return qrimg_base64 + return None + +# 显示二维码图像 +def display_qr_image(qrimg_base64): + img_data = base64.b64decode(qrimg_base64.split(",")[1]) + img = Image.open(BytesIO(img_data)) + img.show() + +# 监控扫码状态 +def check_scan_status(unikey): + url = f"http://localhost:3000/login/qr/check?key={unikey}&time={get_current_timestamp()}" + response = requests.get(url) + #print(f"响应内容: {response.text}") # 打印每次请求的响应内容 + return response.json() + +# 登录函数 +def login(): + unikey = get_unikey() + if not unikey: + print("无法获取unikey") + return None + + qrimg_base64 = create_qr(unikey) + if not qrimg_base64: + print("无法创建二维码") + return None + + # 显示二维码图像 + display_qr_image(qrimg_base64) + + print("请扫描二维码,等待扫码成功...") + + # 监控扫码状态 + while True: + status = check_scan_status(unikey) + + if status['code'] == 803: + print("授权登录成功") + cookie = status['cookie'] + print(f"保存的 Cookie: {cookie}") + # 保存 cookie 到文件 + with open("cookies.txt", "w") as f: + f.write(cookie) + return cookie # 返回 cookie + elif status['code'] == 801: + print("等待扫码,继续请求中...") + elif status['code'] == 802: + print("授权中,继续请求中...") + else: + print(f"扫码失败: {status['message']}") + return None # 返回 None,表示扫码失败 + + time.sleep(2) # 每隔2秒检查一次扫码状态 + diff --git a/main.py b/main.py new file mode 100644 index 0000000..74de983 --- /dev/null +++ b/main.py @@ -0,0 +1,174 @@ +import os +import json +import requests +import time + +# 导入 login 函数 +from login import login # 直接从 login.py 导入 login 函数 +from get_cloud_info import get_cloud_info # 从 get_cloud_info.py 导入 get_cloud_info 函数 + +# 获取当前时间戳(秒) +def get_current_timestamp(): + return int(time.time()) + +# 读取 cookies.txt 文件 +def read_cookie(): + if os.path.exists("cookies.txt"): + with open("cookies.txt", "r") as f: + cookie = f.read().strip() + if cookie: + return cookie + return None + +# 读取歌曲.json 文件并返回数据 +def read_songs_data(): + if os.path.exists("歌曲.json"): + with open("歌曲.json", "r", encoding="utf-8") as f: + try: + data = json.load(f) + return data.get('data', []) + except json.JSONDecodeError: + print("歌曲.json 格式错误") + return [] + else: + print("歌曲.json 文件不存在") + return [] + +# 提取所有歌曲的 id 和其他信息 +def get_all_song_info(songs_data): + song_info_list = [] + for song in songs_data: + song_info = { + 'id': song.get("id"), + 'size': song.get("size"), + 'ext': song.get("ext"), + 'bitrate': song.get("bitrate"), + 'md5': song.get("md5") + } + song_info_list.append(song_info) + return song_info_list + +# 查询歌曲详情 +def get_song_details(song_ids): + ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串 + timestamp = get_current_timestamp() # 获取当前时间戳 + url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}" + print(f"查询歌曲详情 URL: {url}") + response = requests.get(url) + try: + response_data = response.json() + if response_data.get('code') == 200: + return response_data.get('songs', []) + else: + print("获取歌曲详情失败:", response_data.get("message")) + return [] + except json.JSONDecodeError: + print("响应内容无法解析为JSON:", response.text) + return [] + +# 执行 import 请求 +def import_song(song_info, cookie): + song_id = song_info['id'] + artist = song_info['artist'] + album = song_info['album'] + file_size = song_info['size'] + bitrate = song_info['bitrate'] + md5 = song_info['md5'] + file_type = song_info['ext'] + + # 构造完整的请求URL和参数 + timestamp = get_current_timestamp() # 获取当前时间戳 + url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}" + #print(f"执行导入请求 URL: {url}") + + response = requests.get(url) + try: + response_data = response.json() + return response_data + except json.JSONDecodeError: + print("响应内容无法解析为JSON:", response.text) + return None + +# 保存失败的 id 到文件 +def save_failed_id(song_id): + with open("failed_ids.txt", "a") as f: + f.write(f"{song_id}\n") + +# 处理歌曲导入请求 +def process_songs(song_info_list, cookie): + failed_attempts = {} # 记录每个 ID 失败的次数 + for song_info in song_info_list: + song_id = song_info['id'] + print(f"正在导入歌曲ID: {song_id}") + + # 查询歌曲的详细信息 + song_details = get_song_details([song_id]) + if song_details: + song_name = song_details[0]['name'] + song_artist = song_details[0]['ar'][0]['name'] + song_album = song_details[0]['al']['name'] + print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}") + # 更新 song_info 添加 artist 和 album 信息 + song_info['artist'] = song_artist + song_info['album'] = song_album + + attempts = 0 + while attempts < 3: + result = import_song(song_info, cookie) + + if result: + success_songs = result.get('data', {}).get('successSongs', []) + failed = result.get('data', {}).get('failed', []) + + if success_songs: + print(f"歌曲 {song_id} 导入成功!") + break # 成功则跳出循环 + else: + print(f"歌曲 {song_id} 导入失败,失败原因:{failed}") + if all(f['code'] == -100 for f in failed): # 文件已存在的错误码 + print(f"歌曲 {song_id} 文件已存在,跳过") + save_failed_id(song_id) # 保存失败的 ID + break + time.sleep(5) # 请求失败后等待 5 秒重新请求 + attempts += 1 + + if attempts == 3: # 如果失败三次,则跳过此 ID + print(f"歌曲 {song_id} 失败三次,跳过该歌曲。") + save_failed_id(song_id) # 保存失败的 ID + +# 主函数 +def main(): + # 尝试读取已保存的 cookie + cookie = read_cookie() + + if cookie: + #print(f"读取到已保存的 cookie: {cookie}") + # 获取并显示云盘信息 + get_cloud_info(cookie) + else: + print("没有找到 cookie,正在执行登录...") + # 执行 login 函数获取新 cookie + cookie = login() + if cookie: + print(f"登录成功,cookie: {cookie}") + # 获取并显示云盘信息 + get_cloud_info(cookie) + else: + print("登录失败") + return + + # 读取歌曲数据 + songs_data = read_songs_data() + + if songs_data: + #print(f"共找到 {len(songs_data)} 首歌曲,提取歌曲 ID 和其他信息...") + song_info_list = get_all_song_info(songs_data) + #print(f"所有歌曲信息: {song_info_list}") + + # 执行歌曲导入请求 + process_songs(song_info_list, cookie) + else: + print("没有找到任何歌曲数据") + +if __name__ == "__main__": + main()