Add files via upload

This commit is contained in:
luqiao 2025-02-14 04:44:55 +08:00 committed by GitHub
commit 8543f84137
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 292 additions and 0 deletions

37
get_cloud_info.py Normal file
View File

@ -0,0 +1,37 @@
import requests
# 获取云盘信息
def get_cloud_info(cookie):
url = "http://localhost:3000/user/cloud"
params = {"cookie": cookie, "limit": 1}
response = requests.get(url, params=params)
try:
response_data = response.json()
if response_data.get('code') == 200:
total_size = response_data['size']
max_size = response_data['maxSize']
file_count = response_data['count']
# 输出云盘信息
print("云盘信息:")
print(f"总大小: {convert_bytes(total_size)}")
print(f"最大容量: {convert_bytes(max_size)}")
print(f"文件数量: {file_count}")
else:
print(f"获取云盘信息失败: {response_data.get('message')}")
except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text)
# 字节数转换为可读格式GB, MB, TB等
def convert_bytes(size_in_bytes):
try:
size_in_bytes = float(size_in_bytes) # 确保传入的值是一个数字
except ValueError:
return "无效的字节数" # 如果无法转换为数字,则返回错误信息
units = ['B', 'KB', 'MB', 'GB', 'TB']
for unit in units:
if size_in_bytes < 1024.0:
return f"{size_in_bytes:.2f} {unit}"
size_in_bytes /= 1024.0
return f"{size_in_bytes:.2f} {unit}"

81
login.py Normal file
View File

@ -0,0 +1,81 @@
import requests
from PIL import Image
from io import BytesIO
import base64
import time
# 获取当前时间戳
def get_current_timestamp():
return int(time.time())
# 获取unikey
def get_unikey():
url = f"http://localhost:3000/login/qr/key?time={get_current_timestamp()}"
response = requests.get(url)
data = response.json()
if data['code'] == 200:
return data['data']['unikey']
return None
# 创建二维码
def create_qr(unikey):
url = f"http://localhost:3000/login/qr/create?key={unikey}&qrimg=1&time={get_current_timestamp()}"
response = requests.get(url)
data = response.json()
if data['code'] == 200:
qrimg_base64 = data['data']['qrimg']
return qrimg_base64
return None
# 显示二维码图像
def display_qr_image(qrimg_base64):
img_data = base64.b64decode(qrimg_base64.split(",")[1])
img = Image.open(BytesIO(img_data))
img.show()
# 监控扫码状态
def check_scan_status(unikey):
url = f"http://localhost:3000/login/qr/check?key={unikey}&time={get_current_timestamp()}"
response = requests.get(url)
#print(f"响应内容: {response.text}") # 打印每次请求的响应内容
return response.json()
# 登录函数
def login():
unikey = get_unikey()
if not unikey:
print("无法获取unikey")
return None
qrimg_base64 = create_qr(unikey)
if not qrimg_base64:
print("无法创建二维码")
return None
# 显示二维码图像
display_qr_image(qrimg_base64)
print("请扫描二维码,等待扫码成功...")
# 监控扫码状态
while True:
status = check_scan_status(unikey)
if status['code'] == 803:
print("授权登录成功")
cookie = status['cookie']
print(f"保存的 Cookie: {cookie}")
# 保存 cookie 到文件
with open("cookies.txt", "w") as f:
f.write(cookie)
return cookie # 返回 cookie
elif status['code'] == 801:
print("等待扫码,继续请求中...")
elif status['code'] == 802:
print("授权中,继续请求中...")
else:
print(f"扫码失败: {status['message']}")
return None # 返回 None表示扫码失败
time.sleep(2) # 每隔2秒检查一次扫码状态

174
main.py Normal file
View File

@ -0,0 +1,174 @@
import os
import json
import requests
import time
# 导入 login 函数
from login import login # 直接从 login.py 导入 login 函数
from get_cloud_info import get_cloud_info # 从 get_cloud_info.py 导入 get_cloud_info 函数
# 获取当前时间戳(秒)
def get_current_timestamp():
return int(time.time())
# 读取 cookies.txt 文件
def read_cookie():
if os.path.exists("cookies.txt"):
with open("cookies.txt", "r") as f:
cookie = f.read().strip()
if cookie:
return cookie
return None
# 读取歌曲.json 文件并返回数据
def read_songs_data():
if os.path.exists("歌曲.json"):
with open("歌曲.json", "r", encoding="utf-8") as f:
try:
data = json.load(f)
return data.get('data', [])
except json.JSONDecodeError:
print("歌曲.json 格式错误")
return []
else:
print("歌曲.json 文件不存在")
return []
# 提取所有歌曲的 id 和其他信息
def get_all_song_info(songs_data):
song_info_list = []
for song in songs_data:
song_info = {
'id': song.get("id"),
'size': song.get("size"),
'ext': song.get("ext"),
'bitrate': song.get("bitrate"),
'md5': song.get("md5")
}
song_info_list.append(song_info)
return song_info_list
# 查询歌曲详情
def get_song_details(song_ids):
ids = ",".join(map(str, song_ids)) # 将多个 id 拼接成一个以逗号分隔的字符串
timestamp = get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/song/detail?ids={ids}&time={timestamp}"
print(f"查询歌曲详情 URL: {url}")
response = requests.get(url)
try:
response_data = response.json()
if response_data.get('code') == 200:
return response_data.get('songs', [])
else:
print("获取歌曲详情失败:", response_data.get("message"))
return []
except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text)
return []
# 执行 import 请求
def import_song(song_info, cookie):
song_id = song_info['id']
artist = song_info['artist']
album = song_info['album']
file_size = song_info['size']
bitrate = song_info['bitrate']
md5 = song_info['md5']
file_type = song_info['ext']
# 构造完整的请求URL和参数
timestamp = get_current_timestamp() # 获取当前时间戳
url = f"http://localhost:3000/cloud/import?id={song_id}&cookie={cookie}&artist={artist}&album={album}&fileSize={file_size}&bitrate={bitrate}&md5={md5}&fileType={file_type}&time={timestamp}"
#print(f"执行导入请求 URL: {url}")
response = requests.get(url)
try:
response_data = response.json()
return response_data
except json.JSONDecodeError:
print("响应内容无法解析为JSON:", response.text)
return None
# 保存失败的 id 到文件
def save_failed_id(song_id):
with open("failed_ids.txt", "a") as f:
f.write(f"{song_id}\n")
# 处理歌曲导入请求
def process_songs(song_info_list, cookie):
failed_attempts = {} # 记录每个 ID 失败的次数
for song_info in song_info_list:
song_id = song_info['id']
print(f"正在导入歌曲ID: {song_id}")
# 查询歌曲的详细信息
song_details = get_song_details([song_id])
if song_details:
song_name = song_details[0]['name']
song_artist = song_details[0]['ar'][0]['name']
song_album = song_details[0]['al']['name']
print(f"歌曲名: {song_name}, 演唱者: {song_artist}, 专辑: {song_album}")
# 更新 song_info 添加 artist 和 album 信息
song_info['artist'] = song_artist
song_info['album'] = song_album
attempts = 0
while attempts < 3:
result = import_song(song_info, cookie)
if result:
success_songs = result.get('data', {}).get('successSongs', [])
failed = result.get('data', {}).get('failed', [])
if success_songs:
print(f"歌曲 {song_id} 导入成功!")
break # 成功则跳出循环
else:
print(f"歌曲 {song_id} 导入失败,失败原因:{failed}")
if all(f['code'] == -100 for f in failed): # 文件已存在的错误码
print(f"歌曲 {song_id} 文件已存在,跳过")
save_failed_id(song_id) # 保存失败的 ID
break
time.sleep(5) # 请求失败后等待 5 秒重新请求
attempts += 1
if attempts == 3: # 如果失败三次,则跳过此 ID
print(f"歌曲 {song_id} 失败三次,跳过该歌曲。")
save_failed_id(song_id) # 保存失败的 ID
# 主函数
def main():
# 尝试读取已保存的 cookie
cookie = read_cookie()
if cookie:
#print(f"读取到已保存的 cookie: {cookie}")
# 获取并显示云盘信息
get_cloud_info(cookie)
else:
print("没有找到 cookie正在执行登录...")
# 执行 login 函数获取新 cookie
cookie = login()
if cookie:
print(f"登录成功cookie: {cookie}")
# 获取并显示云盘信息
get_cloud_info(cookie)
else:
print("登录失败")
return
# 读取歌曲数据
songs_data = read_songs_data()
if songs_data:
#print(f"共找到 {len(songs_data)} 首歌曲,提取歌曲 ID 和其他信息...")
song_info_list = get_all_song_info(songs_data)
#print(f"所有歌曲信息: {song_info_list}")
# 执行歌曲导入请求
process_songs(song_info_list, cookie)
else:
print("没有找到任何歌曲数据")
if __name__ == "__main__":
main()