# !/usr/bin/env python3 # -*- coding: utf-8 -*- # Modify: 2024-11-13 # Repo: https://github.com/Cp0204/quark_auto_save # ConfigFile: quark_config.json """ new Env('夸克自动追更'); 0 8,18,20 * * * quark_auto_save.py """ import os import re import sys import json import time import random import requests import importlib from datetime import datetime # 兼容青龙 try: from treelib import Tree except: print("正在尝试自动安装依赖...") os.system("pip3 install treelib &> /dev/null") from treelib import Tree CONFIG_DATA = {} NOTIFYS = [] GH_PROXY = os.environ.get("GH_PROXY", "https://ghproxy.net/") MAGIC_REGEX = { "$TV": { "pattern": r".*?(?= max_retries: print(f"SSL错误,已重试{retry_count}次,放弃请求: {str(e)}") fake_response = requests.Response() fake_response.status_code = 500 fake_response._content = b'{"status": 500, "message": "SSL error", "code": 500}' return fake_response # 等待一段时间后重试 wait_time = retry_count * 2 print(f"SSL错误,{wait_time}秒后进行第{retry_count+1}次重试: {str(e)}") time.sleep(wait_time) except requests.exceptions.RequestException as e: retry_count += 1 if retry_count >= max_retries: print(f"请求错误,已重试{retry_count}次,放弃请求: {str(e)}") fake_response = requests.Response() fake_response.status_code = 500 fake_response._content = b'{"status": 500, "message": "request error", "code": 500}' return fake_response # 等待一段时间后重试 wait_time = retry_count * 2 print(f"请求错误,{wait_time}秒后进行第{retry_count+1}次重试: {str(e)}") time.sleep(wait_time) def init(self): account_info = self.get_account_info() if account_info: self.is_active = True self.nickname = account_info["nickname"] return account_info else: return False def get_account_info(self): url = "https://pan.quark.cn/account/info" querystring = {"fr": "pc", "platform": "pc"} response = self._send_request("GET", url, params=querystring).json() if response.get("data"): return response["data"] else: return False def get_growth_info(self): url = f"{self.BASE_URL_APP}/1/clouddrive/capacity/growth/info" querystring = { "pr": "ucpro", "fr": "android", "kps": self.mparam.get("kps"), "sign": self.mparam.get("sign"), "vcode": self.mparam.get("vcode"), } headers = { "content-type": "application/json", } response = self._send_request( "GET", url, headers=headers, params=querystring ).json() if response.get("data"): return response["data"] else: return False def get_growth_sign(self): url = f"{self.BASE_URL_APP}/1/clouddrive/capacity/growth/sign" querystring = { "pr": "ucpro", "fr": "android", "kps": self.mparam.get("kps"), "sign": self.mparam.get("sign"), "vcode": self.mparam.get("vcode"), } payload = { "sign_cyclic": True, } headers = { "content-type": "application/json", } response = self._send_request( "POST", url, json=payload, headers=headers, params=querystring ).json() if response.get("data"): return True, response["data"]["sign_daily_reward"] else: return False, response["message"] # 可验证资源是否失效 def get_stoken(self, pwd_id, passcode=""): url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/token" querystring = {"pr": "ucpro", "fr": "pc"} payload = {"pwd_id": pwd_id, "passcode": passcode} response = self._send_request( "POST", url, json=payload, params=querystring ).json() if response.get("status") == 200: return True, response["data"]["stoken"] else: return False, response["message"] def get_detail(self, pwd_id, stoken, pdir_fid, _fetch_share=0): list_merge = [] page = 1 while True: url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/detail" querystring = { "pr": "ucpro", "fr": "pc", "pwd_id": pwd_id, "stoken": stoken, "pdir_fid": pdir_fid, "force": "0", "_page": page, "_size": "50", "_fetch_banner": "0", "_fetch_share": _fetch_share, "_fetch_total": "1", "_sort": "file_type:asc,updated_at:desc", } response = self._send_request("GET", url, params=querystring).json() if response["data"]["list"]: list_merge += response["data"]["list"] page += 1 else: break if len(list_merge) >= response["metadata"]["_total"]: break response["data"]["list"] = list_merge return response["data"] def get_fids(self, file_paths): fids = [] while True: url = f"{self.BASE_URL}/1/clouddrive/file/info/path_list" querystring = {"pr": "ucpro", "fr": "pc"} payload = {"file_path": file_paths[:50], "namespace": "0"} response = self._send_request( "POST", url, json=payload, params=querystring ).json() if response["code"] == 0: fids += response["data"] file_paths = file_paths[50:] else: print(f"获取目录ID:失败, {response['message']}") break if len(file_paths) == 0: break return fids def ls_dir(self, pdir_fid, **kwargs): file_list = [] page = 1 while True: url = f"{self.BASE_URL}/1/clouddrive/file/sort" querystring = { "pr": "ucpro", "fr": "pc", "uc_param_str": "", "pdir_fid": pdir_fid, "_page": page, "_size": "50", "_fetch_total": "1", "_fetch_sub_dirs": "0", "_sort": "file_type:asc,updated_at:desc", "_fetch_full_path": kwargs.get("fetch_full_path", 0), } response = self._send_request("GET", url, params=querystring).json() if response["data"]["list"]: file_list += response["data"]["list"] page += 1 else: break if len(file_list) >= response["metadata"]["_total"]: break return file_list def save_file(self, fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken): url = f"{self.BASE_URL}/1/clouddrive/share/sharepage/save" querystring = { "pr": "ucpro", "fr": "pc", "uc_param_str": "", "app": "clouddrive", "__dt": int(random.uniform(1, 5) * 60 * 1000), "__t": datetime.now().timestamp(), } payload = { "fid_list": fid_list, "fid_token_list": fid_token_list, "to_pdir_fid": to_pdir_fid, "pwd_id": pwd_id, "stoken": stoken, "pdir_fid": "0", "scene": "link", } response = self._send_request( "POST", url, json=payload, params=querystring ).json() return response def query_task(self, task_id): retry_index = 0 while True: url = f"{self.BASE_URL}/1/clouddrive/task" querystring = { "pr": "ucpro", "fr": "pc", "uc_param_str": "", "task_id": task_id, "retry_index": retry_index, "__dt": int(random.uniform(1, 5) * 60 * 1000), "__t": datetime.now().timestamp(), } response = self._send_request("GET", url, params=querystring).json() if response["data"]["status"] != 0: if retry_index > 0: print() break else: if retry_index == 0: print( f"正在等待[{response['data']['task_title']}]执行结果", end="", flush=True, ) else: print(".", end="", flush=True) retry_index += 1 time.sleep(0.500) return response def download(self, fids): url = f"{self.BASE_URL}/1/clouddrive/file/download" querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""} payload = {"fids": fids} response = self._send_request("POST", url, json=payload, params=querystring) set_cookie = response.cookies.get_dict() cookie_str = "; ".join([f"{key}={value}" for key, value in set_cookie.items()]) return response.json(), cookie_str def mkdir(self, dir_path): url = f"{self.BASE_URL}/1/clouddrive/file" querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""} payload = { "pdir_fid": "0", "file_name": "", "dir_path": dir_path, "dir_init_lock": False, } response = self._send_request( "POST", url, json=payload, params=querystring ).json() return response def rename(self, fid, file_name): url = f"{self.BASE_URL}/1/clouddrive/file/rename" querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""} payload = {"fid": fid, "file_name": file_name} response = self._send_request( "POST", url, json=payload, params=querystring ).json() return response def delete(self, filelist): url = f"{self.BASE_URL}/1/clouddrive/file/delete" querystring = {"pr": "ucpro", "fr": "pc", "uc_param_str": ""} payload = {"action_type": 2, "filelist": filelist, "exclude_fids": []} response = self._send_request( "POST", url, json=payload, params=querystring ).json() return response def recycle_list(self, page=1, size=30): url = f"{self.BASE_URL}/1/clouddrive/file/recycle/list" querystring = { "_page": page, "_size": size, "pr": "ucpro", "fr": "pc", "uc_param_str": "", } response = self._send_request("GET", url, params=querystring).json() return response["data"]["list"] def recycle_remove(self, record_list): url = f"{self.BASE_URL}/1/clouddrive/file/recycle/remove" querystring = {"uc_param_str": "", "fr": "pc", "pr": "ucpro"} payload = { "select_mode": 2, "record_list": record_list, } response = self._send_request( "POST", url, json=payload, params=querystring ).json() return response # ↑ 请求函数 # ↓ 操作函数 # 魔法正则匹配 def magic_regex_func(self, pattern, replace, taskname=None): magic_regex = CONFIG_DATA.get("magic_regex") or MAGIC_REGEX or {} keyword = pattern if keyword in magic_regex: pattern = magic_regex[keyword]["pattern"] if replace == "": replace = magic_regex[keyword]["replace"] if taskname: replace = replace.replace("$TASKNAME", taskname) return pattern, replace def get_id_from_url(self, url): url = url.replace("https://pan.quark.cn/s/", "") pattern = r"(\w+)(\?pwd=(\w+))?(#/list/share.*/(\w+))?" match = re.search(pattern, url) if match: pwd_id = match.group(1) passcode = match.group(3) if match.group(3) else "" pdir_fid = match.group(5) if match.group(5) else 0 return pwd_id, passcode, pdir_fid else: return None def update_savepath_fid(self, tasklist): dir_paths = [ re.sub(r"/{2,}", "/", f"/{item['savepath']}") for item in tasklist if not item.get("enddate") or ( datetime.now().date() <= datetime.strptime(item["enddate"], "%Y-%m-%d").date() ) ] if not dir_paths: return False dir_paths_exist_arr = self.get_fids(dir_paths) dir_paths_exist = [item["file_path"] for item in dir_paths_exist_arr] # 比较创建不存在的 dir_paths_unexist = list(set(dir_paths) - set(dir_paths_exist) - set(["/"])) for dir_path in dir_paths_unexist: mkdir_return = self.mkdir(dir_path) if mkdir_return["code"] == 0: new_dir = mkdir_return["data"] dir_paths_exist_arr.append( {"file_path": dir_path, "fid": new_dir["fid"]} ) print(f"创建文件夹:{dir_path}") else: print(f"创建文件夹:{dir_path} 失败, {mkdir_return['message']}") # 储存目标目录的fid for dir_path in dir_paths_exist_arr: self.savepath_fid[dir_path["file_path"]] = dir_path["fid"] # print(dir_paths_exist_arr) def do_save_check(self, shareurl, savepath): try: pwd_id, passcode, pdir_fid = self.get_id_from_url(shareurl) is_sharing, stoken = self.get_stoken(pwd_id, passcode) share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)["list"] fid_list = [item["fid"] for item in share_file_list] fid_token_list = [item["share_fid_token"] for item in share_file_list] file_name_list = [item["file_name"] for item in share_file_list] if not fid_list: return get_fids = self.get_fids([savepath]) to_pdir_fid = ( get_fids[0]["fid"] if get_fids else self.mkdir(savepath)["data"]["fid"] ) save_file = self.save_file( fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken ) if save_file["code"] == 41017: return elif save_file["code"] == 0: dir_file_list = self.ls_dir(to_pdir_fid) del_list = [ item["fid"] for item in dir_file_list if (item["file_name"] in file_name_list) and ((datetime.now().timestamp() - item["created_at"]) < 60) ] if del_list: self.delete(del_list) recycle_list = self.recycle_list() record_id_list = [ item["record_id"] for item in recycle_list if item["fid"] in del_list ] self.recycle_remove(record_id_list) return save_file else: return False except Exception as e: if os.environ.get("DEBUG") == True: print(f"转存测试失败: {str(e)}") def do_save_task(self, task): # 判断资源失效记录 if task.get("shareurl_ban"): print(f"《{task['taskname']}》:{task['shareurl_ban']}") return # 链接转换所需参数 pwd_id, passcode, pdir_fid = self.get_id_from_url(task["shareurl"]) # print("match: ", pwd_id, pdir_fid) # 获取stoken,同时可验证资源是否失效 is_sharing, stoken = self.get_stoken(pwd_id, passcode) if not is_sharing: add_notify(f"❌《{task['taskname']}》:{stoken}\n") task["shareurl_ban"] = stoken return # print("stoken: ", stoken) updated_tree = self.dir_check_and_save(task, pwd_id, stoken, pdir_fid) if updated_tree.size(1) > 0: add_notify(f"✅《{task['taskname']}》添加追更:\n{updated_tree}") return updated_tree else: print(f"任务结束:没有新的转存任务") return False def dir_check_and_save(self, task, pwd_id, stoken, pdir_fid="", subdir_path=""): tree = Tree() # 获取分享文件列表 share_file_list = self.get_detail(pwd_id, stoken, pdir_fid)["list"] # print("share_file_list: ", share_file_list) if not share_file_list: if subdir_path == "": task["shareurl_ban"] = "分享为空,文件已被分享者删除" add_notify(f"❌《{task['taskname']}》:{task['shareurl_ban']}\n") return tree elif ( len(share_file_list) == 1 and share_file_list[0]["dir"] and subdir_path == "" ): # 仅有一个文件夹 print("🧠 该分享是一个文件夹,读取文件夹内列表") share_file_list = self.get_detail( pwd_id, stoken, share_file_list[0]["fid"] )["list"] # 应用过滤词过滤文件 if task.get("filterwords"): filterwords_list = [word.strip() for word in task["filterwords"].split(',')] share_file_list = [file for file in share_file_list if not any(word in file['file_name'] for word in filterwords_list)] print(f"📑 应用过滤词:{task['filterwords']},剩余{len(share_file_list)}个文件") # 获取目标目录文件列表 savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}") if not self.savepath_fid.get(savepath): if get_fids := self.get_fids([savepath]): self.savepath_fid[savepath] = get_fids[0]["fid"] else: print(f"❌ 目录 {savepath} fid获取失败,跳过转存") return tree to_pdir_fid = self.savepath_fid[savepath] dir_file_list = self.ls_dir(to_pdir_fid) # print("dir_file_list: ", dir_file_list) tree.create_node( savepath, pdir_fid, data={ "is_dir": True, }, ) # 需保存的文件清单 need_save_list = [] # 顺序命名模式下获取当前序号和正则表达式 regex_pattern = None if task.get("use_sequence_naming") and task.get("sequence_naming"): # 获取目录中符合顺序命名格式的最大序号 sequence_pattern = task["sequence_naming"] # 替换占位符为正则表达式捕获组 regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)') max_sequence = 0 for dir_file in dir_file_list: matches = re.match(regex_pattern, dir_file["file_name"]) if matches: try: current_seq = int(matches.group(1)) max_sequence = max(max_sequence, current_seq) except (IndexError, ValueError): pass # 从最大序号开始计数 current_sequence = max_sequence # 添加符合的 for share_file in share_file_list: if share_file["dir"] and task.get("update_subdir", False): pattern, replace = task["update_subdir"], "" elif task.get("use_sequence_naming") and task.get("sequence_naming"): # 使用顺序命名 pattern = ".*" # 匹配任何文件 # 序号暂时留空,等收集完所有文件后再按优先级排序赋值 replace = "TO_BE_REPLACED_LATER" # 保留文件扩展名 if not share_file["dir"]: file_ext = os.path.splitext(share_file["file_name"])[1] replace = replace + file_ext else: pattern, replace = self.magic_regex_func( task["pattern"], task["replace"], task["taskname"] ) # 正则文件名匹配 if re.search(pattern, share_file["file_name"]): # 替换后的文件名 save_name = ( re.sub(pattern, replace, share_file["file_name"]) if replace != "" else share_file["file_name"] ) # 忽略后缀 if task.get("ignore_extension") and not share_file["dir"]: compare_func = lambda a, b1, b2: ( os.path.splitext(a)[0] == os.path.splitext(b1)[0] or os.path.splitext(a)[0] == os.path.splitext(b2)[0] ) else: compare_func = lambda a, b1, b2: (a == b1 or a == b2) # 判断目标目录文件是否存在 file_exists = False # 顺序命名模式下增强去重功能 if task.get("use_sequence_naming") and task.get("sequence_naming") and not share_file["dir"]: # 根据文件大小和修改时间判断文件是否已经存在 file_ext = os.path.splitext(share_file["file_name"])[1].lower() for dir_file in dir_file_list: # 检查是否为相同的文件(根据大小和扩展名判断) dir_file_ext = os.path.splitext(dir_file["file_name"])[1].lower() if (not dir_file["dir"] and dir_file["size"] == share_file["size"] and dir_file_ext == file_ext): # 文件大小相同,扩展名相同,很可能是同一个文件 # 额外检查是否已经有符合顺序命名格式的文件(防止重复转存后重命名) if re.match(regex_pattern, dir_file["file_name"]): print(f"📌 顺序命名去重: {share_file['file_name']} 已存在于目录 {dir_file['file_name']},大小: {format_bytes(share_file['size'])},跳过") file_exists = True break # 如果文件大小相同和扩展名相同,需要进一步检查修改时间是否接近 share_time = share_file.get("last_update_at", 0) dir_time = dir_file.get("updated_at", 0) # 如果修改时间在30天内,或者差距不大,认为是同一个文件 if abs(share_time - dir_time) < 2592000 or abs(1 - (share_time / dir_time if dir_time else 1)) < 0.1: print(f"📌 顺序命名去重: {share_file['file_name']} 与 {dir_file['file_name']} 匹配,大小: {format_bytes(share_file['size'])},跳过") file_exists = True break else: # 原有的文件名匹配判断 file_exists = any( compare_func( dir_file["file_name"], share_file["file_name"], save_name ) for dir_file in dir_file_list ) if not file_exists: share_file["save_name"] = save_name share_file["original_name"] = share_file["file_name"] # 保存原文件名,用于排序 need_save_list.append(share_file) elif share_file["dir"]: # 存在并是一个文件夹 if task.get("update_subdir", False): if re.search(task["update_subdir"], share_file["file_name"]): print(f"检查子文件夹:{savepath}/{share_file['file_name']}") subdir_tree = self.dir_check_and_save( task, pwd_id, stoken, share_file["fid"], f"{subdir_path}/{share_file['file_name']}", ) if subdir_tree.size(1) > 0: # 合并子目录树 tree.create_node( "📁" + share_file["file_name"], share_file["fid"], parent=pdir_fid, data={ "is_dir": share_file["dir"], }, ) tree.merge(share_file["fid"], subdir_tree, deep=False) # 指定文件开始订阅/到达指定文件(含)结束历遍 if share_file["fid"] == task.get("startfid", ""): break # 如果是顺序命名模式,需要重新排序并生成文件名 if task.get("use_sequence_naming") and task.get("sequence_naming") and need_save_list: def custom_sort(file): file_name = file["original_name"] # 1. 提取文件名中的数字(期数/集数等) episode_num = 0 # 尝试匹配"第X期/集/话"格式 episode_match = re.search(r'第(\d+)[期集话]', file_name) if episode_match: episode_num = int(episode_match.group(1)) # 尝试匹配常见视频格式 S01E01, E01, 1x01 等 elif re.search(r'[Ss](\d+)[Ee](\d+)', file_name): match = re.search(r'[Ss](\d+)[Ee](\d+)', file_name) season = int(match.group(1)) episode = int(match.group(2)) episode_num = season * 1000 + episode # 确保季和集的排序正确 elif re.search(r'[Ee](\d+)', file_name): match = re.search(r'[Ee](\d+)', file_name) episode_num = int(match.group(1)) elif re.search(r'(\d+)[xX](\d+)', file_name): match = re.search(r'(\d+)[xX](\d+)', file_name) season = int(match.group(1)) episode = int(match.group(2)) episode_num = season * 1000 + episode # 尝试匹配日期格式 YYYYMMDD elif re.search(r'(\d{4})(\d{2})(\d{2})', file_name): match = re.search(r'(\d{4})(\d{2})(\d{2})', file_name) year = int(match.group(1)) month = int(match.group(2)) day = int(match.group(3)) episode_num = year * 10000 + month * 100 + day # 尝试匹配纯数字格式(文件名开头是纯数字) elif re.search(r'^(\d+)', file_name): match = re.search(r'^(\d+)', file_name) episode_num = int(match.group(1)) # 2. 检查文件名中是否包含"上中下"等排序提示 position_order = 10 # 默认顺序值 if '上' in file_name: position_order = 1 elif '中' in file_name: position_order = 2 elif '下' in file_name: position_order = 3 # 3. 返回排序元组:先按集数排序,再按上中下,最后按更新时间 return (episode_num, position_order, file["last_update_at"] if "last_update_at" in file else 0) # 按自定义逻辑排序 need_save_list = sorted(need_save_list, key=custom_sort) # 重新生成命名 for index, file in enumerate(need_save_list): current_sequence += 1 if file["dir"]: file["save_name"] = sequence_pattern.replace("{}", f"{current_sequence:02d}") else: file_ext = os.path.splitext(file["file_name"])[1] file["save_name"] = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext fid_list = [item["fid"] for item in need_save_list] fid_token_list = [item["share_fid_token"] for item in need_save_list] if fid_list: save_file_return = self.save_file( fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken ) err_msg = None if save_file_return["code"] == 0: task_id = save_file_return["data"]["task_id"] query_task_return = self.query_task(task_id) if query_task_return["code"] == 0: # 建立目录树 for index, item in enumerate(need_save_list): icon = ( "📁" if item["dir"] == True else "🎞️" if item["obj_category"] == "video" else "" ) tree.create_node( f"{icon}{item['save_name']}", item["fid"], parent=pdir_fid, data={ "fid": f"{query_task_return['data']['save_as']['save_as_top_fids'][index]}", "path": f"{savepath}/{item['save_name']}", "is_dir": item["dir"], }, ) else: err_msg = query_task_return["message"] else: err_msg = save_file_return["message"] if err_msg: add_notify(f"❌《{task['taskname']}》转存失败:{err_msg}\n") return tree def do_rename_task(self, task, subdir_path=""): if task.get("use_sequence_naming") and task.get("sequence_naming"): # 使用顺序命名模式 sequence_pattern = task["sequence_naming"] # 替换占位符为正则表达式捕获组 regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)') savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}") if not self.savepath_fid.get(savepath): self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"] dir_file_list = self.ls_dir(self.savepath_fid[savepath]) dir_file_name_list = [item["file_name"] for item in dir_file_list] # 找出当前最大序号 max_sequence = 0 for dir_file in dir_file_list: matches = re.match(regex_pattern, dir_file["file_name"]) if matches: try: current_seq = int(matches.group(1)) max_sequence = max(max_sequence, current_seq) except (IndexError, ValueError): pass # 重命名文件 current_sequence = max_sequence is_rename_count = 0 # 定义一个排序函数,支持多种格式的排序 def custom_sort(file): file_name = file["file_name"] # 1. 提取文件名中的数字(期数/集数等) episode_num = 0 # 尝试匹配"第X期/集/话"格式 episode_match = re.search(r'第(\d+)[期集话]', file_name) if episode_match: episode_num = int(episode_match.group(1)) # 尝试匹配常见视频格式 S01E01, E01, 1x01 等 elif re.search(r'[Ss](\d+)[Ee](\d+)', file_name): match = re.search(r'[Ss](\d+)[Ee](\d+)', file_name) season = int(match.group(1)) episode = int(match.group(2)) episode_num = season * 1000 + episode # 确保季和集的排序正确 elif re.search(r'[Ee](\d+)', file_name): match = re.search(r'[Ee](\d+)', file_name) episode_num = int(match.group(1)) elif re.search(r'(\d+)[xX](\d+)', file_name): match = re.search(r'(\d+)[xX](\d+)', file_name) season = int(match.group(1)) episode = int(match.group(2)) episode_num = season * 1000 + episode # 尝试匹配日期格式 YYYYMMDD elif re.search(r'(\d{4})(\d{2})(\d{2})', file_name): match = re.search(r'(\d{4})(\d{2})(\d{2})', file_name) year = int(match.group(1)) month = int(match.group(2)) day = int(match.group(3)) episode_num = year * 10000 + month * 100 + day # 尝试匹配纯数字格式(文件名开头是纯数字) elif re.search(r'^(\d+)', file_name): match = re.search(r'^(\d+)', file_name) episode_num = int(match.group(1)) # 2. 检查文件名中是否包含"上中下"等排序提示 position_order = 10 # 默认顺序值 if '上' in file_name: position_order = 1 elif '中' in file_name: position_order = 2 elif '下' in file_name: position_order = 3 # 3. 返回排序元组:先按集数排序,再按上中下,最后按创建时间 return (episode_num, position_order, file["created_at"]) # 按自定义逻辑排序 sorted_files = sorted([f for f in dir_file_list if not f["dir"] and not re.match(regex_pattern, f["file_name"])], key=custom_sort) for dir_file in sorted_files: current_sequence += 1 file_ext = os.path.splitext(dir_file["file_name"])[1] save_name = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext if save_name != dir_file["file_name"] and save_name not in dir_file_name_list: try: rename_return = self.rename(dir_file["fid"], save_name) # 防止网络问题导致的错误 if isinstance(rename_return, dict) and rename_return.get("code") == 0: print(f"重命名:{dir_file['file_name']} → {save_name}") is_rename_count += 1 dir_file_name_list.append(save_name) else: error_msg = rename_return.get("message", "未知错误") print(f"重命名:{dir_file['file_name']} → {save_name} 失败,{error_msg}") except Exception as e: print(f"重命名出错:{dir_file['file_name']} → {save_name},错误:{str(e)}") return is_rename_count > 0 else: # 原有的正则匹配模式 pattern, replace = self.magic_regex_func( task["pattern"], task["replace"], task["taskname"] ) if not pattern or not replace: return 0 savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}") if not self.savepath_fid.get(savepath): self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"] dir_file_list = self.ls_dir(self.savepath_fid[savepath]) dir_file_name_list = [item["file_name"] for item in dir_file_list] is_rename_count = 0 for dir_file in dir_file_list: if dir_file["dir"]: is_rename_count += self.do_rename_task( task, f"{subdir_path}/{dir_file['file_name']}" ) if re.search(pattern, dir_file["file_name"]): save_name = ( re.sub(pattern, replace, dir_file["file_name"]) if replace != "" else dir_file["file_name"] ) if save_name != dir_file["file_name"] and ( save_name not in dir_file_name_list ): try: rename_return = self.rename(dir_file["fid"], save_name) if isinstance(rename_return, dict) and rename_return.get("code") == 0: print(f"重命名:{dir_file['file_name']} → {save_name}") is_rename_count += 1 else: error_msg = rename_return.get("message", "未知错误") print(f"重命名:{dir_file['file_name']} → {save_name} 失败,{error_msg}") except Exception as e: print(f"重命名出错:{dir_file['file_name']} → {save_name},错误:{str(e)}") return is_rename_count > 0 def verify_account(account): # 验证账号 print(f"▶️ 验证第{account.index}个账号") if "__uid" not in account.cookie: print(f"💡 不存在cookie必要参数,判断为仅签到") return False else: account_info = account.init() if not account_info: add_notify(f"👤 第{account.index}个账号登录失败,cookie无效❌") return False else: print(f"👤 账号昵称: {account_info['nickname']}✅") return True def format_bytes(size_bytes: int) -> str: units = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") i = 0 while size_bytes >= 1024 and i < len(units) - 1: size_bytes /= 1024 i += 1 return f"{size_bytes:.2f} {units[i]}" def do_sign(account): if not account.mparam: print("⏭️ 移动端参数未设置,跳过签到") print() return # 每日领空间 growth_info = account.get_growth_info() if growth_info: growth_message = f"💾 {'88VIP' if growth_info['88VIP'] else '普通用户'} 总空间:{format_bytes(growth_info['total_capacity'])},签到累计获得:{format_bytes(growth_info['cap_composition'].get('sign_reward', 0))}" if growth_info["cap_sign"]["sign_daily"]: sign_message = f"📅 签到记录: 今日已签到+{int(growth_info['cap_sign']['sign_daily_reward']/1024/1024)}MB,连签进度({growth_info['cap_sign']['sign_progress']}/{growth_info['cap_sign']['sign_target']})✅" message = f"{sign_message}\n{growth_message}" print(message) else: sign, sign_return = account.get_growth_sign() if sign: sign_message = f"📅 执行签到: 今日签到+{int(sign_return/1024/1024)}MB,连签进度({growth_info['cap_sign']['sign_progress']+1}/{growth_info['cap_sign']['sign_target']})✅" message = f"{sign_message}\n{growth_message}" if ( str( CONFIG_DATA.get("push_config", {}).get("QUARK_SIGN_NOTIFY") ).lower() == "false" or os.environ.get("QUARK_SIGN_NOTIFY") == "false" ): print(message) else: message = message.replace("今日", f"[{account.nickname}]今日") add_notify(message) else: print(f"📅 签到异常: {sign_return}") print() def do_save(account, tasklist=[]): print(f"🧩 载入插件") plugins, CONFIG_DATA["plugins"], task_plugins_config = Config.load_plugins( CONFIG_DATA.get("plugins", {}) ) print(f"转存账号: {account.nickname}") # 获取全部保存目录fid account.update_savepath_fid(tasklist) def check_date(task): return ( not task.get("enddate") or ( datetime.now().date() <= datetime.strptime(task["enddate"], "%Y-%m-%d").date() ) ) and ( not task.get("runweek") # 星期一为0,星期日为6 or (datetime.today().weekday() + 1 in task.get("runweek")) ) # 执行任务 for index, task in enumerate(tasklist): # 判断任务期限 if check_date(task): print() print(f"#{index+1}------------------") print(f"任务名称: {task['taskname']}") print(f"分享链接: {task['shareurl']}") print(f"保存路径: {task['savepath']}") # 打印重命名规则信息 if task.get("use_sequence_naming") and task.get("sequence_naming"): print(f"顺序命名: {task['sequence_naming']}") else: print(f"正则匹配: {task['pattern']}") print(f"正则替换: {task['replace']}") if task.get("enddate"): print(f"任务截止: {task['enddate']}") if task.get("ignore_extension"): print(f"忽略后缀: {task['ignore_extension']}") if task.get("update_subdir"): print(f"更子目录: {task['update_subdir']}") print() is_new_tree = account.do_save_task(task) is_rename = account.do_rename_task(task) # 补充任务的插件配置 def merge_dicts(a, b): result = a.copy() for key, value in b.items(): if ( key in result and isinstance(result[key], dict) and isinstance(value, dict) ): result[key] = merge_dicts(result[key], value) elif key not in result: result[key] = value return result task["addition"] = merge_dicts( task.get("addition", {}), task_plugins_config ) # 调用插件 if is_new_tree or is_rename: print(f"🧩 调用插件") for plugin_name, plugin in plugins.items(): if plugin.is_active and (is_new_tree or is_rename): task = ( plugin.run(task, account=account, tree=is_new_tree) or task ) print() def main(): global CONFIG_DATA start_time = datetime.now() print(f"===============程序开始===============") print(f"⏰ 执行时间: {start_time.strftime('%Y-%m-%d %H:%M:%S')}") print() # 读取启动参数 config_path = sys.argv[1] if len(sys.argv) > 1 else "quark_config.json" task_index = int(sys.argv[2]) if len(sys.argv) > 2 and sys.argv[2].isdigit() else "" # 检查本地文件是否存在,如果不存在就下载 if not os.path.exists(config_path): if os.environ.get("QUARK_COOKIE"): print( f"⚙️ 读取到 QUARK_COOKIE 环境变量,仅签到领空间。如需执行转存,请删除该环境变量后配置 {config_path} 文件" ) cookie_val = os.environ.get("QUARK_COOKIE") cookie_form_file = False else: print(f"⚙️ 配置文件 {config_path} 不存在❌,正远程从下载配置模版") config_url = f"{GH_PROXY}https://raw.githubusercontent.com/Cp0204/quark_auto_save/main/quark_config.json" if Config.download_file(config_url, config_path): print("⚙️ 配置模版下载成功✅,请到程序目录中手动配置") return else: print(f"⚙️ 正从 {config_path} 文件中读取配置") with open(config_path, "r", encoding="utf-8") as file: CONFIG_DATA = json.load(file) Config.breaking_change_update(CONFIG_DATA) cookie_val = CONFIG_DATA.get("cookie") if not CONFIG_DATA.get("magic_regex"): CONFIG_DATA["magic_regex"] = MAGIC_REGEX cookie_form_file = True # 获取cookie cookies = Config.get_cookies(cookie_val) if not cookies: print("❌ cookie 未配置") return accounts = [Quark(cookie, index) for index, cookie in enumerate(cookies)] # 签到 print(f"===============签到任务===============") if type(task_index) is int: verify_account(accounts[0]) else: for account in accounts: verify_account(account) do_sign(account) print() # 转存 if accounts[0].is_active and cookie_form_file: print(f"===============转存任务===============") # 任务列表 tasklist = CONFIG_DATA.get("tasklist", []) if type(task_index) is int: do_save(accounts[0], [tasklist[task_index]]) else: do_save(accounts[0], tasklist) print() # 通知 if NOTIFYS: notify_body = "\n".join(NOTIFYS) print(f"===============推送通知===============") send_ql_notify("【夸克自动追更】", notify_body) print() if cookie_form_file: # 更新配置 with open(config_path, "w", encoding="utf-8") as file: json.dump(CONFIG_DATA, file, ensure_ascii=False, sort_keys=False, indent=2) print(f"===============程序结束===============") duration = datetime.now() - start_time print(f"😃 运行时长: {round(duration.total_seconds(), 2)}s") print() if __name__ == "__main__": main()