面向对象

This commit is contained in:
Cp0204 2024-04-03 00:53:05 +08:00
parent d485ef796c
commit 166e18fcfa

View File

@ -1,6 +1,6 @@
# !/usr/bin/env python3
# -*- coding: utf-8 -*-
# Modify: 2024-02-29
# Modify: 2024-04-03
# Repo: https://github.com/Cp0204/quark_auto_save
# ConfigFile: quark_config.json
"""
@ -16,8 +16,9 @@ import requests
from datetime import datetime
config_data = {}
check_data = {}
notifys = []
first_account = {}
magic_regex = {
"$TV": {
@ -61,35 +62,90 @@ def add_notify(text):
return text
def common_headers():
# 下载配置
def download_file(url, save_path):
response = requests.get(url)
if response.status_code == 200:
with open(save_path, "wb") as file:
file.write(response.content)
return True
else:
return False
# 读取CK
def get_cookies(cookie_val):
if isinstance(cookie_val, list):
return cookie_val
elif cookie_val:
if "\n" in cookie_val:
return cookie_val.split("\n")
else:
return [cookie_val]
else:
return False
class Quark:
def __init__(self, cookie, index=None):
self.cookie = cookie
self.index = index + 1
self.is_active = False
self.nickname = ""
def common_headers(self):
return {
"cookie": first_account["cookie"],
"cookie": self.cookie,
"content-type": "application/json",
}
def init(self):
account_info = self.get_account_info()
if account_info:
self.is_active = True
self.nickname = account_info["nickname"]
return account_info
else:
return False
def get_growth_info(cookie):
url = "https://drive-m.quark.cn/1/clouddrive/capacity/growth/info"
querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}
def get_account_info(self):
url = "https://pan.quark.cn/account/info"
querystring = {"fr": "pc", "platform": "pc"}
headers = {
"cookie": cookie,
"cookie": self.cookie,
"content-type": "application/json",
}
response = requests.request("GET", url, headers=headers, params=querystring).json()
response = requests.request(
"GET", url, headers=headers, params=querystring
).json()
if response.get("data"):
return response["data"]
else:
return False
def get_growth_info(self):
url = "https://drive-m.quark.cn/1/clouddrive/capacity/growth/info"
querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}
headers = {
"cookie": self.cookie,
"content-type": "application/json",
}
response = requests.request(
"GET", url, headers=headers, params=querystring
).json()
if response.get("data"):
return response["data"]
else:
return False
def get_growth_sign(cookie):
def get_growth_sign(self):
url = "https://drive-m.quark.cn/1/clouddrive/capacity/growth/sign"
querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}
payload = {
"sign_cyclic": True,
}
headers = {
"cookie": cookie,
"cookie": self.cookie,
"content-type": "application/json",
}
response = requests.request(
@ -100,9 +156,9 @@ def get_growth_sign(cookie):
else:
return False, response["message"]
def get_id_from_url(url):
pattern = r"/s/(\w+)(#/list/share.*/(\w+))?"
def get_id_from_url(self, url):
url = url.replace("https://pan.quark.cn/s/", "")
pattern = r"(\w+)(#/list/share.*/(\w+))?"
match = re.search(pattern, url)
if match:
pwd_id = match.group(1)
@ -114,27 +170,12 @@ def get_id_from_url(url):
else:
return None
def get_account_info(cookie):
url = "https://pan.quark.cn/account/info"
querystring = {"fr": "pc", "platform": "pc"}
headers = {
"cookie": cookie,
"content-type": "application/json",
}
response = requests.request("GET", url, headers=headers, params=querystring).json()
if response.get("data"):
return response["data"]
else:
return False
# 可验证资源是否失效
def get_stoken(pwd_id):
def get_stoken(self, pwd_id):
url = "https://pan.quark.cn/1/clouddrive/share/sharepage/token"
querystring = {"pr": "ucpro", "fr": "h5"}
payload = {"pwd_id": pwd_id, "passcode": ""}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
@ -143,8 +184,7 @@ def get_stoken(pwd_id):
else:
return False, response["message"]
def get_detail(pwd_id, stoken, pdir_fid):
def get_detail(self, pwd_id, stoken, pdir_fid):
file_list = []
page = 1
while True:
@ -163,7 +203,7 @@ def get_detail(pwd_id, stoken, pdir_fid):
"_fetch_total": "1",
"_sort": "file_type:asc,updated_at:desc",
}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"GET", url, headers=headers, params=querystring
).json()
@ -174,20 +214,15 @@ def get_detail(pwd_id, stoken, pdir_fid):
break
if len(file_list) >= response["metadata"]["_total"]:
break
# 仅有一个文件夹
if len(file_list) == 1 and file_list[0]["file_type"] == 0:
print("🧠 该分享是一个文件夹,读取文件夹内列表")
file_list = get_detail(pwd_id, stoken, file_list[0]["fid"])
return file_list
def get_fids(file_paths):
def get_fids(self, file_paths):
fids = []
while True:
url = "https://drive.quark.cn/1/clouddrive/file/info/path_list"
querystring = {"pr": "ucpro", "fr": "pc"}
payload = {"file_path": file_paths[:50], "namespace": "0"}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
@ -201,8 +236,7 @@ def get_fids(file_paths):
break
return fids
def ls_dir(pdir_fid):
def ls_dir(self, pdir_fid):
file_list = []
page = 1
while True:
@ -218,7 +252,7 @@ def ls_dir(pdir_fid):
"_fetch_sub_dirs": "0",
"_sort": "file_type:asc,updated_at:desc",
}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"GET", url, headers=headers, params=querystring
).json()
@ -231,8 +265,7 @@ def ls_dir(pdir_fid):
break
return file_list
def save_file(fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken):
def save_file(self, fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken):
url = "https://drive.quark.cn/1/clouddrive/share/sharepage/save"
querystring = {
"pr": "ucpro",
@ -250,14 +283,13 @@ def save_file(fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken):
"pdir_fid": "0",
"scene": "link",
}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
return response
def mkdir(dir_path):
def mkdir(self, dir_path):
url = "https://drive-pc.quark.cn/1/clouddrive/file"
querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}
payload = {
@ -266,25 +298,61 @@ def mkdir(dir_path):
"dir_path": dir_path,
"dir_init_lock": False,
}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
return response
def rename(fid, file_name):
def rename(self, fid, file_name):
url = "https://drive-pc.quark.cn/1/clouddrive/file/rename"
querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}
payload = {"fid": fid, "file_name": file_name}
headers = common_headers()
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
return response
def delete(self, filelist):
url = "https://drive-pc.quark.cn/1/clouddrive/file/delete"
querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""}
payload = {"action_type": 2, "filelist": filelist, "exclude_fids": []}
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
return response
def update_savepath_fid(tasklist):
def recycle_list(self, page=1, size=30):
url = "https://drive-m.quark.cn/1/clouddrive/file/recycle/list"
querystring = {
"_page": page,
"_size": size,
"pr": "ucpro",
"fr": "pc",
"uc_param_str": "",
}
headers = self.common_headers()
response = requests.request(
"GET", url, headers=headers, params=querystring
).json()
return response["data"]["list"]
def recycle_remove(self, record_list):
url = "https://drive-m.quark.cn/1/clouddrive/file/recycle/remove"
querystring = {"uc_param_str": "", "fr": "pc", "pr": "ucpro"}
payload = {
"select_mode": 2,
"record_list": record_list,
}
headers = self.common_headers()
response = requests.request(
"POST", url, json=payload, headers=headers, params=querystring
).json()
return response
def update_savepath_fid(self, tasklist):
dir_paths = [
item["savepath"]
for item in tasklist
@ -296,15 +364,17 @@ def update_savepath_fid(tasklist):
]
if not dir_paths:
return False
dir_paths_exist_arr = get_fids(dir_paths)
dir_paths_exist_arr = self.get_fids(dir_paths)
dir_paths_exist = [item["file_path"] for item in dir_paths_exist_arr]
# 比较创建不存在的
dir_paths_unexist = list(set(dir_paths) - set(dir_paths_exist))
for dir_path in dir_paths_unexist:
mkdir_return = mkdir(dir_path)
mkdir_return = self.mkdir(dir_path)
if mkdir_return["code"] == 0:
new_dir = mkdir_return["data"]
dir_paths_exist_arr.append({"file_path": dir_path, "fid": new_dir["fid"]})
dir_paths_exist_arr.append(
{"file_path": dir_path, "fid": new_dir["fid"]}
)
print(f"创建文件夹:{dir_path}")
else:
print(f"创建文件夹:{dir_path} 失败, {mkdir_return['message']}")
@ -315,19 +385,61 @@ def update_savepath_fid(tasklist):
task["savepath_fid"] = dir_path["fid"]
# print(dir_paths_exist_arr)
def do_save_check(self, shareurl, savepath):
try:
pwd_id, pdir_fid = self.get_id_from_url(shareurl)
is_sharing, stoken = self.get_stoken(pwd_id)
share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)
fid_list = [item["fid"] for item in share_file_list]
fid_token_list = [item["share_fid_token"] for item in share_file_list]
file_name_list = [item["file_name"] for item in share_file_list]
if not fid_list:
return
self.mkdir(savepath)
to_pdir_fid = (
"0" if savepath == "/" else self.get_fids([savepath])[0]["fid"]
)
save_file = self.save_file(
fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken
)
if save_file["code"] == 41017:
return
elif save_file["code"] == 0:
dir_file_list = self.ls_dir(to_pdir_fid)
del_list = [
item["fid"]
for item in dir_file_list
if (item["file_name"] in file_name_list)
and ((datetime.now().timestamp() - item["created_at"]) < 60)
]
if del_list:
self.delete(del_list)
recycle_list = self.recycle_list()
record_id_list = [
item["record_id"]
for item in recycle_list
if item["fid"] in del_list
]
self.recycle_remove(record_id_list)
return save_file
else:
return False
except Exception as e:
if os.environ.get("DEBUG") == True:
print(f"转存测试失败: {str(e)}")
def do_save_task(task):
def do_save_task(self, task):
# 判断资源失效记录
if task.get("shareurl_ban"):
print(f"{task['taskname']}》:{task['shareurl_ban']}")
return
# 链接转换所需参数
pwd_id, pdir_fid = get_id_from_url(task["shareurl"])
pwd_id, pdir_fid = self.get_id_from_url(task["shareurl"])
# print("match: ", pwd_id, pdir_fid)
# 获取stoken同时可验证资源是否失效
is_sharing, stoken = get_stoken(pwd_id)
is_sharing, stoken = self.get_stoken(pwd_id)
if not is_sharing:
add_notify(f"{task['taskname']}》:{stoken}")
task["shareurl_ban"] = stoken
@ -335,7 +447,11 @@ def do_save_task(task):
# print("stoken: ", stoken)
# 获取分享文件列表
share_file_list = get_detail(pwd_id, stoken, pdir_fid)
share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)
# 仅有一个文件夹
if len(share_file_list) == 1 and share_file_list[0]["dir"]:
print("🧠 该分享是一个文件夹,读取文件夹内列表")
share_file_list = self.get_detail(pwd_id, stoken, share_file_list[0]["fid"])
if not share_file_list:
add_notify(f"{task['taskname']}》:分享目录为空")
return
@ -345,10 +461,10 @@ def do_save_task(task):
task["savepath_fid"] = (
task.get("savepath_fid")
if task.get("savepath_fid")
else get_fids([task["savepath"]])[0]["fid"]
else self.get_fids([task["savepath"]])[0]["fid"]
)
to_pdir_fid = task["savepath_fid"]
dir_file_list = ls_dir(to_pdir_fid)
dir_file_list = self.ls_dir(to_pdir_fid)
# print("dir_file_list: ", dir_file_list)
# 需保存的文件清单
@ -373,7 +489,9 @@ def do_save_task(task):
else:
compare_func = lambda a, b1, b2: (a == b1 or a == b2)
file_exists = any(
compare_func(dir_file["file_name"], share_file["file_name"], save_name)
compare_func(
dir_file["file_name"], share_file["file_name"], save_name
)
for dir_file in dir_file_list
)
if not file_exists:
@ -384,12 +502,12 @@ def do_save_task(task):
fid_token_list = [item["share_fid_token"] for item in need_save_list]
save_name_list = [item["save_name"] for item in need_save_list]
if fid_list:
save_file_return = save_file(
save_file_return = self.save_file(
fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken
)
if save_file_return["code"] == 0:
task_id = save_file_return["data"]["task_id"]
query_task_return = query_task(task_id)
query_task_return = self.query_task(task_id)
if query_task_return["code"] == 0:
save_name_list.sort()
add_notify(
@ -406,8 +524,7 @@ def do_save_task(task):
print("任务结束:没有新的转存任务")
return False
def query_task(task_id):
def query_task(self, task_id):
url = "https://drive-pc.quark.cn/1/clouddrive/task"
querystring = {
"pr": "ucpro",
@ -418,15 +535,16 @@ def query_task(task_id):
"__dt": int(random.uniform(1, 5) * 60 * 1000),
"__t": datetime.now().timestamp(),
}
headers = common_headers()
response = requests.request("GET", url, headers=headers, params=querystring).json()
headers = self.common_headers()
response = requests.request(
"GET", url, headers=headers, params=querystring
).json()
if response["code"] == 32003:
response["message"] = "容量限制"
response["message"] = "容量"
return response
def do_rename_task(task):
dir_file_list = ls_dir(task["savepath_fid"])
def do_rename_task(self, task):
dir_file_list = self.ls_dir(task["savepath_fid"])
is_rename = False
for dir_file in dir_file_list:
pattern, replace = magic_regex_func(task["pattern"], task["replace"])
@ -437,7 +555,7 @@ def do_rename_task(task):
else dir_file["file_name"]
)
if save_name != dir_file["file_name"]:
rename_return = rename(dir_file["fid"], save_name)
rename_return = self.rename(dir_file["fid"], save_name)
if rename_return["code"] == 0:
print(f"重命名:{dir_file['file_name']}{save_name}")
is_rename = True
@ -448,12 +566,18 @@ def do_rename_task(task):
return is_rename
def emby_refresh(emby_id):
emby_url = config_data.get("emby").get("url")
emby_apikey = config_data.get("emby").get("apikey")
if emby_url and emby_apikey and emby_id:
url = f"{emby_url}/emby/Items/{emby_id}/Refresh"
headers = {"X-Emby-Token": emby_apikey}
class Emby:
def __init__(self, emby_url, emby_apikey):
if emby_url and emby_apikey:
self.emby_url = emby_url
self.emby_apikey = emby_apikey
else:
return False
def emby_refresh(self, emby_id):
if emby_id:
url = f"{self.emby_url}/emby/Items/{emby_id}/Refresh"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {
"Recursive": "true",
"MetadataRefreshMode": "FullRefresh",
@ -461,7 +585,9 @@ def emby_refresh(emby_id):
"ReplaceAllMetadata": "false",
"ReplaceAllImages": "false",
}
response = requests.request("POST", url, headers=headers, params=querystring)
response = requests.request(
"POST", url, headers=headers, params=querystring
)
if response.text == "":
print(f"🎞 刷新Emby媒体库成功✅")
return True
@ -469,13 +595,10 @@ def emby_refresh(emby_id):
print(f"🎞 刷新Emby媒体库{response.text}")
return False
def emby_search(media_name):
emby_url = config_data.get("emby").get("url")
emby_apikey = config_data.get("emby").get("apikey")
if emby_url and emby_apikey and media_name:
url = f"{emby_url}/emby/Items"
headers = {"X-Emby-Token": emby_apikey}
def emby_search(self, media_name):
if media_name:
url = f"{self.emby_url}/emby/Items"
headers = {"X-Emby-Token": self.emby_apikey}
querystring = {
"IncludeItemTypes": "Series",
"StartIndex": 0,
@ -493,85 +616,81 @@ def emby_search(media_name):
if response.get("Items"):
for item in response["Items"]:
if item["IsFolder"]:
print(f"🎞 《{item['Name']}》匹配到Emby媒体库ID{item['Id']}")
print(
f"🎞 《{item['Name']}》匹配到Emby媒体库ID{item['Id']}"
)
return item["Id"]
else:
print(f"🎞 搜索Emby媒体库{response.text}")
return False
def download_file(url, save_path):
response = requests.get(url)
if response.status_code == 200:
with open(save_path, "wb") as file:
file.write(response.content)
return True
else:
return False
def save_check(account):
global check_data
if (
config_data.get("SAVE_CHECK") == False
or os.environ.get("SAVE_CHECK") == "false"
):
return
if not check_data:
check_data = requests.get(
"https://mirror.ghproxy.com/https://gist.githubusercontent.com/Cp0204/4764fd0110d5f5bd875eb9a9ff77ccd0/raw/quark_save_check.json"
).json()
if check_data.get("pwd_id"):
return account.do_save_check(
check_data["pwd_id"], check_data["savepath"]
)
def get_cookies(cookie_val):
if isinstance(cookie_val, list):
return cookie_val
elif cookie_val:
if "\n" in cookie_val:
return cookie_val.split("\n")
else:
return [cookie_val]
else:
return False
def do_sign(cookies):
first_account = {}
print(f"===============签到任务===============")
for index, cookie in enumerate(cookies):
def verify_account(account):
# 验证账号
account_info = get_account_info(cookie)
print(f"▶️ 验证第{index+1}个账号")
account_info = account.init()
print(f"▶️ 验证第{account.index}个账号")
if not account_info:
add_notify(f"👤 第{index+1}个账号登录失败cookie无效❌")
add_notify(f"👤 第{account.index}个账号登录失败cookie无效❌")
return False
else:
if index == 0:
first_account = account_info
first_account["cookie"] = cookie
print(f"👤 账号昵称: {account_info['nickname']}")
return True
def do_sign(account):
if not verify_account(account):
return
# 每日领空间
growth_info = get_growth_info(cookie)
growth_info = account.get_growth_info()
if growth_info:
if growth_info["cap_sign"]["sign_daily"]:
print(
f"📅 执行签到: 今日已签到+{int(growth_info['cap_sign']['sign_daily_reward']/1024/1024)}MB连签进度({growth_info['cap_sign']['sign_progress']}/{growth_info['cap_sign']['sign_target']})✅"
)
else:
sign, sign_return = get_growth_sign(cookie)
sign, sign_return = account.get_growth_sign()
save_check_flag = "💹" if save_check(account) else ""
if sign:
message = f"📅 执行签到: 今日签到+{int(sign_return/1024/1024)}MB连签进度({growth_info['cap_sign']['sign_progress']+1}/{growth_info['cap_sign']['sign_target']})"
message = f"📅 执行签到: 今日签到+{int(sign_return/1024/1024)}MB连签进度({growth_info['cap_sign']['sign_progress']+1}/{growth_info['cap_sign']['sign_target']}){save_check_flag}"
if (
config_data.get("push_config").get("QUARK_SIGN_NOTIFY")
== False
or os.environ.get("QUARK_SIGN_NOTIFY") == False
config_data.get("push_config").get("QUARK_SIGN_NOTIFY") == False
or os.environ.get("QUARK_SIGN_NOTIFY") == "false"
):
print(message)
else:
message = message.replace(
"今日", f"[{account_info['nickname']}]今日"
)
message = message.replace("今日", f"[{account.nickname}]今日")
add_notify(message)
else:
print(f"📅 执行签到: {sign_return}")
print(f"")
print(f"")
return first_account
def do_save():
print(f"===============转存任务===============")
print(f"转存账号: {first_account['nickname']}")
def do_save(account):
print(f"转存账号: {account.nickname}")
# 任务列表
tasklist = config_data.get("tasklist", [])
# 获取全部保存目录fid
update_savepath_fid(tasklist)
account.update_savepath_fid(tasklist)
emby = Emby(
config_data.get("emby").get("url"), config_data.get("emby").get("apikey")
)
def check_date(task):
return (
@ -604,21 +723,22 @@ def do_save():
if task.get("ignore_extension"):
print(f"忽略后缀: {task['ignore_extension']}")
print()
is_new = do_save_task(task)
is_rename = do_rename_task(task)
if (is_new or is_rename) and task.get("emby_id") != "0":
is_new = account.do_save_task(task)
is_rename = account.do_rename_task(task)
# 刷新媒体库
if emby and (is_new or is_rename) and task.get("emby_id") != "0":
if task.get("emby_id"):
emby_refresh(task["emby_id"])
emby.emby_refresh(task["emby_id"])
else:
match_emby_id = emby_search(task["taskname"])
match_emby_id = emby.emby_search(task["taskname"])
if match_emby_id:
task["emby_id"] = match_emby_id
emby_refresh(match_emby_id)
emby.emby_refresh(match_emby_id)
print(f"")
def main():
global config_data, first_account
global config_data
start_time = datetime.now()
print(f"===============程序开始===============")
print(f"⏰ 执行时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
@ -654,11 +774,17 @@ def main():
if not cookies:
print("❌ cookie 未配置")
return
accounts = [Quark(cookie, index) for index, cookie in enumerate(cookies)]
# 签到
first_account = do_sign(cookies)
print(f"===============签到任务===============")
for account in accounts:
do_sign(account)
print(f"")
# 转存
if first_account and cookie_form_file:
do_save()
if accounts[0].is_active and cookie_form_file:
print(f"===============转存任务===============")
do_save(accounts[0])
print(f"")
# 通知
if notifys:
notify_body = "\n".join(notifys)