新增顺序命名模式

This commit is contained in:
x1ao4 2025-04-21 04:58:20 +08:00
parent 997bba7047
commit 7c20a03c5f
3 changed files with 874 additions and 113 deletions

View File

@ -278,33 +278,187 @@ def get_share_detail():
# 正则命名预览
def preview_regex(share_detail):
regex = request.json.get("regex")
pattern, replace = account.magic_regex_func(
regex.get("pattern", ""),
regex.get("replace", ""),
regex.get("taskname", ""),
regex.get("magic_regex", {}),
)
# 应用过滤词过滤
filterwords = request.json.get("regex", {}).get("filterwords", "")
if filterwords:
# 同时支持中英文逗号分隔
filterwords = filterwords.replace("", ",")
filterwords_list = [word.strip() for word in filterwords.split(',')]
for item in share_detail["list"]:
# 被过滤的文件不会有file_name_re与不匹配正则的文件显示一致
if any(word in item['file_name'] for word in filterwords_list):
item["filtered"] = True
# 检查是否为顺序命名模式
if regex.get("use_sequence_naming") and regex.get("sequence_naming"):
# 顺序命名模式预览
sequence_pattern = regex.get("sequence_naming")
current_sequence = 1
# 应用正则命名
for item in share_detail["list"]:
# 只对未被过滤的文件应用正则命名
if not item.get("filtered") and re.search(pattern, item["file_name"]):
file_name = item["file_name"]
item["file_name_re"] = (
re.sub(pattern, replace, file_name) if replace != "" else file_name
)
return share_detail
# 构建顺序命名的正则表达式
regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)')
# 实现高级排序算法
def extract_sorting_value(file):
if file["dir"]: # 跳过文件夹
return float('inf')
filename = file["file_name"]
# 提取文件名,不含扩展名
file_name_without_ext = os.path.splitext(filename)[0]
# 1. "第X期/集/话" 格式
match_chinese = re.search(r'第(\d+)[期集话]', filename)
episode_num = int(match_chinese.group(1)) if match_chinese else 0
# 5. 文件名含"上中下"(优先处理,因为可能与其他格式同时存在)
if match_chinese:
# 如果同时存在集数和上中下,则按照集数*10+位置排序
if '' in filename:
return episode_num * 10 + 1
elif '' in filename:
return episode_num * 10 + 2
elif '' in filename:
return episode_num * 10 + 3
elif '' in filename:
return 1
elif '' in filename:
return 2
elif '' in filename:
return 3
# 如果已经匹配到"第X期/集/话"格式,直接返回
if episode_num > 0:
return episode_num * 10
# 2.1 S01E01 格式,提取季数和集数
match_s_e = re.search(r'[Ss](\d+)[Ee](\d+)', filename)
if match_s_e:
season = int(match_s_e.group(1))
episode = int(match_s_e.group(2))
return season * 1000 + episode
# 2.2 E01 格式,仅提取集数
match_e = re.search(r'[Ee][Pp]?(\d+)', filename)
if match_e:
return int(match_e.group(1))
# 2.3 1x01 格式,提取季数和集数
match_x = re.search(r'(\d+)[Xx](\d+)', filename)
if match_x:
season = int(match_x.group(1))
episode = int(match_x.group(2))
return season * 1000 + episode
# 3. 日期格式识别(支持多种格式)
# 3.1 完整的YYYYMMDD格式
match_date_compact = re.search(r'(20\d{2})(\d{2})(\d{2})', filename)
if match_date_compact:
year = int(match_date_compact.group(1))
month = int(match_date_compact.group(2))
day = int(match_date_compact.group(3))
return year * 10000 + month * 100 + day
# 3.2 YYYY-MM-DD 或 YYYY.MM.DD 或 YYYY/MM/DD 格式
match_date_full = re.search(r'(20\d{2})[-./](\d{1,2})[-./](\d{1,2})', filename)
if match_date_full:
year = int(match_date_full.group(1))
month = int(match_date_full.group(2))
day = int(match_date_full.group(3))
return year * 10000 + month * 100 + day
# 3.3 MM/DD/YYYY 或 DD/MM/YYYY 格式
match_date_alt = re.search(r'(\d{1,2})[-./](\d{1,2})[-./](20\d{2})', filename)
if match_date_alt:
# 假设第一个是月,第二个是日(美式日期)
month = int(match_date_alt.group(1))
day = int(match_date_alt.group(2))
year = int(match_date_alt.group(3))
# 检查月份值如果大于12可能是欧式日期格式DD/MM/YYYY
if month > 12:
month, day = day, month
return year * 10000 + month * 100 + day
# 3.4 MM/DD 格式(无年份),假设为当前年
match_date_short = re.search(r'(\d{1,2})[-./](\d{1,2})', filename)
if match_date_short:
# 假设第一个是月,第二个是日
month = int(match_date_short.group(1))
day = int(match_date_short.group(2))
# 检查月份值如果大于12可能是欧式日期格式DD/MM
if month > 12:
month, day = day, month
# 由于没有年份,使用一个较低的基数,确保任何有年份的日期都排在后面
return month * 100 + day
# 3.5 年期格式,如"2025年14期"
match_year_issue = re.search(r'(20\d{2})[年].*?(\d+)[期]', filename)
if match_year_issue:
year = int(match_year_issue.group(1))
issue = int(match_year_issue.group(2))
return year * 1000 + issue
# 4. 纯数字格式(文件名开头是纯数字)
match_num = re.match(r'^(\d+)', file_name_without_ext)
if match_num:
return int(match_num.group(1))
# 6. 默认使用更新时间
try:
return file.get("last_update_at", 0)
except:
return 0
# 过滤出非目录文件,并且排除已经符合命名规则的文件
files_to_process = [
f for f in share_detail["list"]
if not f["dir"] and not re.match(regex_pattern, f["file_name"])
]
# 根据提取的排序值进行排序
sorted_files = sorted(files_to_process, key=extract_sorting_value)
# 应用过滤词过滤
filterwords = regex.get("filterwords", "")
if filterwords:
# 同时支持中英文逗号分隔
filterwords = filterwords.replace("", ",")
filterwords_list = [word.strip() for word in filterwords.split(',')]
for item in sorted_files:
# 被过滤的文件不会有file_name_re与不匹配正则的文件显示一致
if any(word in item['file_name'] for word in filterwords_list):
item["filtered"] = True
# 为每个文件分配序号
for file in sorted_files:
if not file.get("filtered"):
# 获取文件扩展名
file_ext = os.path.splitext(file["file_name"])[1]
# 生成预览文件名
file["file_name_re"] = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext
current_sequence += 1
return share_detail
else:
# 普通正则命名预览
pattern, replace = account.magic_regex_func(
regex.get("pattern", ""),
regex.get("replace", ""),
regex.get("taskname", ""),
regex.get("magic_regex", {}),
)
# 应用过滤词过滤
filterwords = regex.get("filterwords", "")
if filterwords:
# 同时支持中英文逗号分隔
filterwords = filterwords.replace("", ",")
filterwords_list = [word.strip() for word in filterwords.split(',')]
for item in share_detail["list"]:
# 被过滤的文件不会有file_name_re与不匹配正则的文件显示一致
if any(word in item['file_name'] for word in filterwords_list):
item["filtered"] = True
# 应用正则命名
for item in share_detail["list"]:
# 只对未被过滤的文件应用正则命名
if not item.get("filtered") and re.search(pattern, item["file_name"]):
file_name = item["file_name"]
item["file_name_re"] = (
re.sub(pattern, replace, file_name) if replace != "" else file_name
)
return share_detail
share_detail = preview_regex(share_detail)

View File

@ -323,10 +323,10 @@
<div class="col-sm-10">
<div class="input-group">
<div class="input-group-prepend">
<button class="btn btn-outline-secondary" type="button" @click="fileSelect.selectDir=true;fileSelect.previewRegex=true;showShareSelect(index)" title="预览正则命名效果">正则命名</button>
<button class="btn btn-outline-secondary" type="button" @click="fileSelect.selectDir=true;fileSelect.previewRegex=true;showShareSelect(index)" title="预览正则命名效果">{{ task.use_sequence_naming ? '顺序命名' : '正则命名' }}</button>
</div>
<input type="text" name="pattern[]" class="form-control" v-model="task.pattern" placeholder="匹配表达式" list="magicRegex">
<input type="text" name="replace[]" class="form-control" v-model="task.replace" placeholder="替换表达式">
<input type="text" name="pattern[]" class="form-control" v-model="task.pattern" placeholder="匹配表达式 (E{} 或 S01E{} 表示顺序命名模式)" list="magicRegex" @input="detectNamingMode(task)">
<input type="text" name="replace[]" class="form-control" v-model="task.replace" placeholder="替换表达式" :disabled="task.use_sequence_naming">
<div class="input-group-append" title="保存时只比较文件名的部分01.mp4 和 01.mkv 视同为同一文件,不重复转存">
<div class="input-group-text">
<input type="checkbox" v-model="task.ignore_extension">&nbsp;忽略后缀
@ -434,7 +434,7 @@
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">
<b v-if="fileSelect.previewRegex">正则命名预览</b>
<b v-if="fileSelect.previewRegex">{{ formData.tasklist[fileSelect.index].use_sequence_naming ? '顺序命名预览' : '正则命名预览' }}</b>
<b v-else-if="fileSelect.selectDir">选择{{fileSelect.selectShare ? '需转存的' : '保存到的'}}文件夹</b>
<b v-else>选择起始文件</b>
<div v-if="modalLoading" class="spinner-border spinner-border-sm m-1" role="status"></div>
@ -458,8 +458,13 @@
</nav>
<!-- 文件列表 -->
<div class="mb-3" v-if="fileSelect.previewRegex">
<div><b>匹配表达式:</b><span class="badge badge-info" v-html="formData.tasklist[fileSelect.index].pattern"></span></div>
<div><b>替换表达式:</b><span class="badge badge-info" v-html="formData.tasklist[fileSelect.index].replace"></span></div>
<div v-if="formData.tasklist[fileSelect.index].use_sequence_naming">
<b>顺序命名模式:</b><span class="badge badge-info" v-html="formData.tasklist[fileSelect.index].pattern"></span>
</div>
<div v-else>
<b>匹配表达式:</b><span class="badge badge-info" v-html="formData.tasklist[fileSelect.index].pattern"></span>
<b>替换表达式:</b><span class="badge badge-info" v-html="formData.tasklist[fileSelect.index].replace"></span>
</div>
</div>
<table class="table table-hover table-sm">
<thead>
@ -532,7 +537,9 @@
addition: {},
ignore_extension: false,
filterwords: "",
runweek: [1, 2, 3, 4, 5, 6, 7]
runweek: [1, 2, 3, 4, 5, 6, 7],
sequence_naming: "",
use_sequence_naming: false
},
run_log: "",
taskDirs: [""],
@ -594,6 +601,24 @@
}
});
window.addEventListener('beforeunload', this.handleBeforeUnload);
// 初始化时检查所有任务的命名模式
setTimeout(() => {
if (this.formData.tasklist && this.formData.tasklist.length > 0) {
this.formData.tasklist.forEach(task => {
// 检查现有的顺序命名设置
if (task.use_sequence_naming && task.sequence_naming) {
// 已经设置过顺序命名的,将顺序命名模式转换为匹配表达式
if (!task.pattern || task._pattern_backup) {
task.pattern = task.sequence_naming;
}
} else {
// 检测是否包含顺序命名模式
this.detectNamingMode(task);
}
});
}
}, 500);
},
beforeDestroy() {
window.removeEventListener('beforeunload', this.handleBeforeUnload);
@ -680,6 +705,16 @@
}
},
saveConfig() {
// 保存前处理每个任务的命名模式
if (this.formData.tasklist && this.formData.tasklist.length > 0) {
this.formData.tasklist.forEach(task => {
// 如果是顺序命名模式确保sequence_naming字段已正确设置
if (task.use_sequence_naming && task.pattern && task.pattern.includes('{}')) {
task.sequence_naming = task.pattern;
}
});
}
axios.post('/update', this.formData)
.then(response => {
if (response.data.success) {
@ -725,7 +760,18 @@
}
}
}
// 初始化新任务的命名模式相关字段
if (newTask.taskname) {
// 默认使用正则命名模式
newTask.pattern = ".*"; // 默认匹配所有文件
newTask.replace = ""; // 默认保持原文件名
newTask.use_sequence_naming = false;
newTask.sequence_naming = "";
}
this.formData.tasklist.push(newTask);
// 滚到最下
setTimeout(() => {
$('#collapse_' + (this.formData.tasklist.length - 1)).collapse('show').on('shown.bs.collapse', () => {
@ -1007,6 +1053,8 @@
taskname: this.formData.tasklist[this.fileSelect.index].taskname,
filterwords: this.formData.tasklist[this.fileSelect.index].filterwords,
magic_regex: this.formData.magic_regex,
use_sequence_naming: this.formData.tasklist[this.fileSelect.index].use_sequence_naming,
sequence_naming: this.formData.tasklist[this.fileSelect.index].sequence_naming
}
}).then(response => {
if (response.data.success) {
@ -1082,6 +1130,74 @@
shareurl = `${shareurl}#/list/share/${path.fid}-${path.name}`
}
return shareurl;
},
detectNamingMode(task) {
// 检测是否为顺序命名模式
const sequencePatterns = ['E{}', 'EP{}', 'S\\d+E{}', '第{}集', '第{}话', '第{}期'];
let isSequenceNaming = false;
// 保存当前值以支持撤销操作
const currentValue = task.pattern;
if (task.pattern) {
// 检查是否包含任何顺序命名模式
isSequenceNaming = sequencePatterns.some(pattern => {
const regexPattern = pattern.replace('{}', '\\{\\}');
return new RegExp(regexPattern).test(task.pattern);
});
// 或者用户直接输入包含{}的格式,且替换表达式为空
if (!isSequenceNaming && task.pattern.includes('{}') && (!task.replace || task.replace === '')) {
isSequenceNaming = true;
}
}
// 处理模式切换
if (isSequenceNaming) {
// 如果当前不是顺序命名模式,则保存现有的正则表达式
if (!task.use_sequence_naming) {
task._pattern_backup = task.pattern;
task._replace_backup = task.replace;
task.use_sequence_naming = true;
}
// 设置序列命名模式
task.sequence_naming = task.pattern;
} else {
// 如果当前是顺序命名模式,但现在检测不到顺序命名模式
if (task.use_sequence_naming) {
// 如果用户正在删除内容(当前值为空或比上一次更短)
if (!currentValue || (task._lastPatternValue && currentValue.length < task._lastPatternValue.length)) {
// 保持当前编辑状态,不切换模式
task.sequence_naming = currentValue;
// 只有当完全删除后才切换回正则模式
if (!currentValue) {
task.use_sequence_naming = false;
if (task._pattern_backup) {
task.pattern = "";
task.replace = task._replace_backup || "";
}
task.sequence_naming = null;
}
} else if (task._pattern_backup && !task.pattern.includes('{}')) {
// 正常切换回正则命名模式(非删除操作)
task.use_sequence_naming = false;
task.pattern = task._pattern_backup;
task.replace = task._replace_backup;
task._sequence_backup = task.sequence_naming;
task.sequence_naming = null;
} else if (!task._pattern_backup && !task.pattern.includes('{}')) {
// 没有备份,但需要切换回正则模式
task.use_sequence_naming = false;
task.sequence_naming = null;
}
}
}
// 保存当前值,用于下次比较
task._lastPatternValue = currentValue;
// 强制Vue更新视图
this.$forceUpdate();
}
}
});

View File

@ -64,7 +64,7 @@ def send_ql_notify(title, body):
def add_notify(text):
global NOTIFYS
NOTIFYS.append(text)
print("📢", text)
print(text)
return text
@ -637,27 +637,75 @@ class Quark:
def do_save_task(self, task):
# 判断资源失效记录
if task.get("shareurl_ban"):
print(f"{task['taskname']}》:{task['shareurl_ban']}")
print(f"分享资源已失效:{task['shareurl_ban']}")
add_notify(f"❗《{task['taskname']}》分享资源已失效:{task['shareurl_ban']}\n")
return
# 链接转换所需参数
pwd_id, passcode, pdir_fid, _ = self.extract_url(task["shareurl"])
# 获取stoken同时可验证资源是否失效
# 提取链接参数
pwd_id, passcode, pdir_fid, paths = self.extract_url(task["shareurl"])
if not pwd_id:
task["shareurl_ban"] = f"提取链接参数失败,请检查分享链接是否有效"
print(f"提取链接参数失败,请检查分享链接是否有效")
return
# 获取分享详情
is_sharing, stoken = self.get_stoken(pwd_id, passcode)
if not is_sharing:
add_notify(f"❌《{task['taskname']}》:{stoken}\n")
task["shareurl_ban"] = stoken
print(f"分享详情获取失败:{stoken}")
add_notify(f"❗《{task['taskname']}》分享详情获取失败:{stoken}\n")
return
# print("stoken: ", stoken)
share_detail = self.get_detail(pwd_id, stoken, pdir_fid, _fetch_share=1)
# 获取保存路径fid
savepath = task["savepath"]
if not self.savepath_fid.get(savepath):
# 检查规范化路径是否已在字典中
norm_savepath = re.sub(r"/{2,}", "/", f"/{savepath}")
if norm_savepath != savepath and self.savepath_fid.get(norm_savepath):
self.savepath_fid[savepath] = self.savepath_fid[norm_savepath]
else:
savepath_fids = self.get_fids([savepath])
if not savepath_fids:
print(f"保存路径不存在,准备新建:{savepath}")
mkdir_result = self.mkdir(savepath)
if mkdir_result["code"] == 0:
self.savepath_fid[savepath] = mkdir_result["data"]["fid"]
print(f"保存路径新建成功:{savepath}")
else:
print(f"保存路径新建失败:{mkdir_result['message']}")
return
else:
# 路径已存在直接设置fid
self.savepath_fid[savepath] = savepath_fids[0]["fid"]
updated_tree = self.dir_check_and_save(task, pwd_id, stoken, pdir_fid)
if updated_tree.size(1) > 0:
add_notify(f"✅《{task['taskname']}》添加追更:\n{updated_tree}")
return updated_tree
# 支持顺序命名模式
if task.get("use_sequence_naming") and task.get("sequence_naming"):
# 顺序命名模式下已经在do_save中打印了顺序命名信息这里不再重复打印
# 设置正则模式为空
task["regex_pattern"] = None
# 构建顺序命名的正则表达式
sequence_pattern = task["sequence_naming"]
# 将{}替换为(\d+)用于匹配
regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)')
task["regex_pattern"] = regex_pattern
else:
print(f"任务结束:没有新的转存任务")
# 正则命名模式
pattern, replace = self.magic_regex_func(
task.get("pattern", ""), task.get("replace", ""), task["taskname"]
)
# 注释掉这里的正则表达式打印因为在do_save函数中已经打印了
# 只有在非魔法变量情况下才显示展开后的正则表达式
# 对于魔法变量($TV等),显示原始输入
# if pattern and task.get("pattern") and task.get("pattern") not in CONFIG_DATA.get("magic_regex", MAGIC_REGEX):
# print(f"正则匹配: {pattern}")
# print(f"正则替换: {replace}")
# 保存文件
tree = self.dir_check_and_save(task, pwd_id, stoken, pdir_fid)
# 检查是否有新文件转存
if tree and tree.size() <= 1: # 只有根节点意味着没有新文件
return False
return tree
def dir_check_and_save(self, task, pwd_id, stoken, pdir_fid="", subdir_path=""):
tree = Tree()
@ -686,20 +734,33 @@ class Quark:
filterwords = task["filterwords"].replace("", ",")
filterwords_list = [word.strip() for word in filterwords.split(',')]
share_file_list = [file for file in share_file_list if not any(word in file['file_name'] for word in filterwords_list)]
print(f"📑 应用过滤词:{task['filterwords']},剩余{len(share_file_list)}个文件")
print(f"📑 应用过滤词: {task['filterwords']},剩余{len(share_file_list)}个文件")
print()
# 获取目标目录文件列表
savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")
if not self.savepath_fid.get(savepath):
if get_fids := self.get_fids([savepath]):
self.savepath_fid[savepath] = get_fids[0]["fid"]
# 检查规范化路径是否已在字典中
norm_savepath = re.sub(r"/{2,}", "/", f"/{savepath}")
if norm_savepath != savepath and self.savepath_fid.get(norm_savepath):
self.savepath_fid[savepath] = self.savepath_fid[norm_savepath]
else:
print(f"❌ 目录 {savepath} fid获取失败跳过转存")
return tree
savepath_fids = self.get_fids([savepath])
if not savepath_fids:
print(f"保存路径不存在,准备新建:{savepath}")
mkdir_result = self.mkdir(savepath)
if mkdir_result["code"] == 0:
self.savepath_fid[savepath] = mkdir_result["data"]["fid"]
print(f"保存路径新建成功:{savepath}")
else:
print(f"保存路径新建失败:{mkdir_result['message']}")
return
else:
# 路径已存在直接设置fid
self.savepath_fid[savepath] = savepath_fids[0]["fid"]
to_pdir_fid = self.savepath_fid[savepath]
dir_file_list = self.ls_dir(to_pdir_fid)
# print("dir_file_list: ", dir_file_list)
tree.create_node(
savepath,
pdir_fid,
@ -708,72 +769,326 @@ class Quark:
},
)
# 需保存的文件清单
need_save_list = []
# 添加符合的
for share_file in share_file_list:
if share_file["dir"] and task.get("update_subdir", False):
pattern, replace = task["update_subdir"], ""
else:
pattern, replace = self.magic_regex_func(
task.get("pattern", ""), task.get("replace", ""), task["taskname"]
)
# 正则文件名匹配
if re.search(pattern, share_file["file_name"]):
# 替换后的文件名
save_name = (
re.sub(pattern, replace, share_file["file_name"])
if replace != ""
else share_file["file_name"]
)
# 忽略后缀
if task.get("ignore_extension") and not share_file["dir"]:
compare_func = lambda a, b1, b2: (
os.path.splitext(a)[0] == os.path.splitext(b1)[0]
or os.path.splitext(a)[0] == os.path.splitext(b2)[0]
)
else:
compare_func = lambda a, b1, b2: (a == b1 or a == b2)
# 判断目标目录文件是否存在
# 处理顺序命名模式
if task.get("use_sequence_naming") and task.get("sequence_naming"):
# 顺序命名模式
current_sequence = 1
sequence_pattern = task["sequence_naming"]
regex_pattern = task.get("regex_pattern")
# 查找目录中现有的最大序号
for dir_file in dir_file_list:
if not dir_file["dir"]: # 只检查文件
if matches := re.match(regex_pattern, dir_file["file_name"]):
try:
seq_num = int(matches.group(1))
current_sequence = max(current_sequence, seq_num + 1)
except (ValueError, IndexError):
pass
# 构建目标目录中所有文件的查重索引(按大小和修改时间)
dir_files_map = {}
for dir_file in dir_file_list:
if not dir_file["dir"]: # 仅处理文件
file_size = dir_file.get("size", 0)
file_ext = os.path.splitext(dir_file["file_name"])[1].lower()
update_time = dir_file.get("updated_at", 0)
# 创建大小+扩展名的索引,用于快速查重
key = f"{file_size}_{file_ext}"
if key not in dir_files_map:
dir_files_map[key] = []
dir_files_map[key].append({
"file_name": dir_file["file_name"],
"updated_at": update_time,
})
# 预先过滤掉已经存在的文件(按大小和扩展名比对)
filtered_share_files = []
for share_file in share_file_list:
if share_file["dir"]:
# 处理子目录
if task.get("update_subdir") and re.search(task["update_subdir"], share_file["file_name"]):
filtered_share_files.append(share_file)
continue
file_size = share_file.get("size", 0)
file_ext = os.path.splitext(share_file["file_name"])[1].lower()
share_update_time = share_file.get("last_update_at", 0)
# 检查是否已存在相同大小和扩展名的文件
key = f"{file_size}_{file_ext}"
is_duplicate = False
if key in dir_files_map:
for existing_file in dir_files_map[key]:
existing_update_time = existing_file.get("updated_at", 0)
# 如果修改时间相近30天内或者差距不大10%以内),认为是同一个文件
if (abs(share_update_time - existing_update_time) < 2592000 or
abs(1 - (share_update_time / existing_update_time if existing_update_time else 1)) < 0.1):
is_duplicate = True
break
# 只有非重复文件才进行处理
if not is_duplicate:
filtered_share_files.append(share_file)
# 指定文件开始订阅/到达指定文件(含)结束历遍
if share_file["fid"] == task.get("startfid", ""):
break
# 实现高级排序算法
def extract_sorting_value(file):
if file["dir"]: # 跳过文件夹
return float('inf')
filename = file["file_name"]
# 提取文件名,不含扩展名
file_name_without_ext = os.path.splitext(filename)[0]
# 1. "第X期/集/话" 格式
match_chinese = re.search(r'第(\d+)[期集话]', filename)
episode_num = int(match_chinese.group(1)) if match_chinese else 0
# 5. 文件名含"上中下"(优先处理,因为可能与其他格式同时存在)
if match_chinese:
# 如果同时存在集数和上中下,则按照集数*10+位置排序
if '' in filename:
return episode_num * 10 + 1
elif '' in filename:
return episode_num * 10 + 2
elif '' in filename:
return episode_num * 10 + 3
elif '' in filename:
return 1
elif '' in filename:
return 2
elif '' in filename:
return 3
# 如果已经匹配到"第X期/集/话"格式,直接返回
if episode_num > 0:
return episode_num * 10
# 2.1 S01E01 格式,提取季数和集数
match_s_e = re.search(r'[Ss](\d+)[Ee](\d+)', filename)
if match_s_e:
season = int(match_s_e.group(1))
episode = int(match_s_e.group(2))
return season * 1000 + episode
# 2.2 E01 格式,仅提取集数
match_e = re.search(r'[Ee][Pp]?(\d+)', filename)
if match_e:
return int(match_e.group(1))
# 2.3 1x01 格式,提取季数和集数
match_x = re.search(r'(\d+)[Xx](\d+)', filename)
if match_x:
season = int(match_x.group(1))
episode = int(match_x.group(2))
return season * 1000 + episode
# 3. 日期格式识别(支持多种格式)
# 3.1 完整的YYYYMMDD格式
match_date_compact = re.search(r'(20\d{2})(\d{2})(\d{2})', filename)
if match_date_compact:
year = int(match_date_compact.group(1))
month = int(match_date_compact.group(2))
day = int(match_date_compact.group(3))
return year * 10000 + month * 100 + day
# 3.2 YYYY-MM-DD 或 YYYY.MM.DD 或 YYYY/MM/DD 格式
match_date_full = re.search(r'(20\d{2})[-./](\d{1,2})[-./](\d{1,2})', filename)
if match_date_full:
year = int(match_date_full.group(1))
month = int(match_date_full.group(2))
day = int(match_date_full.group(3))
return year * 10000 + month * 100 + day
# 3.3 MM/DD/YYYY 或 DD/MM/YYYY 格式
match_date_alt = re.search(r'(\d{1,2})[-./](\d{1,2})[-./](20\d{2})', filename)
if match_date_alt:
# 假设第一个是月,第二个是日(美式日期)
# 在实际应用中可能需要根据具体情况调整
month = int(match_date_alt.group(1))
day = int(match_date_alt.group(2))
year = int(match_date_alt.group(3))
# 检查月份值如果大于12可能是欧式日期格式DD/MM/YYYY
if month > 12:
month, day = day, month
return year * 10000 + month * 100 + day
# 3.4 MM/DD 格式(无年份),假设为当前年
match_date_short = re.search(r'(\d{1,2})[-./](\d{1,2})', filename)
if match_date_short:
# 假设第一个是月,第二个是日
month = int(match_date_short.group(1))
day = int(match_date_short.group(2))
# 检查月份值如果大于12可能是欧式日期格式DD/MM
if month > 12:
month, day = day, month
# 由于没有年份,使用一个较低的基数,确保任何有年份的日期都排在后面
return month * 100 + day
# 3.5 年期格式,如"2025年14期"
match_year_issue = re.search(r'(20\d{2})[年].*?(\d+)[期]', filename)
if match_year_issue:
year = int(match_year_issue.group(1))
issue = int(match_year_issue.group(2))
return year * 1000 + issue
# 4. 纯数字格式(文件名开头是纯数字)
match_num = re.match(r'^(\d+)', file_name_without_ext)
if match_num:
return int(match_num.group(1))
# 6. 默认使用更新时间
try:
return file.get("updated_at", file.get("last_update_at", 0))
except:
return 0
# 过滤出非目录文件,排除已经排除掉的重复文件,然后排序
files_to_process = [
f for f in filtered_share_files
if not f["dir"] and not re.match(regex_pattern, f["file_name"])
]
# 根据提取的排序值进行排序
sorted_files = sorted(files_to_process, key=extract_sorting_value)
# 需保存的文件清单
need_save_list = []
# 为每个文件分配序号
for share_file in sorted_files:
# 获取文件扩展名
file_ext = os.path.splitext(share_file["file_name"])[1]
# 生成新文件名
save_name = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext
# 检查目标目录是否已存在此文件
file_exists = any(
compare_func(
dir_file["file_name"], share_file["file_name"], save_name
)
for dir_file in dir_file_list
dir_file["file_name"] == save_name for dir_file in dir_file_list
)
if not file_exists:
# 不打印保存信息
share_file["save_name"] = save_name
share_file["original_name"] = share_file["file_name"] # 保存原文件名,用于排序
need_save_list.append(share_file)
elif share_file["dir"]:
# 存在并是一个文件夹
if task.get("update_subdir", False):
if re.search(task["update_subdir"], share_file["file_name"]):
print(f"检查子文件夹:{savepath}/{share_file['file_name']}")
subdir_tree = self.dir_check_and_save(
task,
pwd_id,
stoken,
current_sequence += 1
# 指定文件开始订阅/到达指定文件(含)结束历遍
if share_file["fid"] == task.get("startfid", ""):
break
# 处理子文件夹
for share_file in share_file_list:
if share_file["dir"] and task.get("update_subdir", False):
if re.search(task["update_subdir"], share_file["file_name"]):
print(f"检查子文件夹: {savepath}/{share_file['file_name']}")
subdir_tree = self.dir_check_and_save(
task,
pwd_id,
stoken,
share_file["fid"],
f"{subdir_path}/{share_file['file_name']}",
)
if subdir_tree.size(1) > 0:
# 合并子目录树
tree.create_node(
"📁" + share_file["file_name"],
share_file["fid"],
f"{subdir_path}/{share_file['file_name']}",
parent=pdir_fid,
data={
"is_dir": share_file["dir"],
},
)
if subdir_tree.size(1) > 0:
# 合并子目录树
tree.create_node(
"📁" + share_file["file_name"],
tree.merge(share_file["fid"], subdir_tree, deep=False)
else:
# 正则命名模式
need_save_list = []
# 添加符合的
for share_file in share_file_list:
if share_file["dir"] and task.get("update_subdir", False):
pattern, replace = task["update_subdir"], ""
else:
pattern, replace = self.magic_regex_func(
task.get("pattern", ""), task.get("replace", ""), task["taskname"]
)
# 正则文件名匹配
if re.search(pattern, share_file["file_name"]):
# 替换后的文件名
save_name = (
re.sub(pattern, replace, share_file["file_name"])
if replace != ""
else share_file["file_name"]
)
# 忽略后缀
if task.get("ignore_extension") and not share_file["dir"]:
compare_func = lambda a, b1, b2: (
os.path.splitext(a)[0] == os.path.splitext(b1)[0]
or os.path.splitext(a)[0] == os.path.splitext(b2)[0]
)
else:
compare_func = lambda a, b1, b2: (a == b1 or a == b2)
# 判断目标目录文件是否存在
file_exists = False
for dir_file in dir_file_list:
if compare_func(
dir_file["file_name"], share_file["file_name"], save_name
):
file_exists = True
# 删除对文件打印部分
break
if not file_exists:
# 不打印保存信息
share_file["save_name"] = save_name
share_file["original_name"] = share_file["file_name"] # 保存原文件名,用于排序
need_save_list.append(share_file)
elif share_file["dir"]:
# 存在并是一个文件夹
if task.get("update_subdir", False):
if re.search(task["update_subdir"], share_file["file_name"]):
print(f"检查子文件夹: {savepath}/{share_file['file_name']}")
subdir_tree = self.dir_check_and_save(
task,
pwd_id,
stoken,
share_file["fid"],
parent=pdir_fid,
data={
"is_dir": share_file["dir"],
},
f"{subdir_path}/{share_file['file_name']}",
)
tree.merge(share_file["fid"], subdir_tree, deep=False)
# 指定文件开始订阅/到达指定文件(含)结束历遍
if share_file["fid"] == task.get("startfid", ""):
break
if subdir_tree.size(1) > 0:
# 合并子目录树
tree.create_node(
"📁" + share_file["file_name"],
share_file["fid"],
parent=pdir_fid,
data={
"is_dir": share_file["dir"],
},
)
tree.merge(share_file["fid"], subdir_tree, deep=False)
# 指定文件开始订阅/到达指定文件(含)结束历遍
if share_file["fid"] == task.get("startfid", ""):
break
fid_list = [item["fid"] for item in need_save_list]
fid_token_list = [item["share_fid_token"] for item in need_save_list]
if fid_list:
# 只在有新文件需要转存时才打印目录文件列表
print(f"📂 目标目录:{savepath} ({len(dir_file_list)}个文件)")
for file in dir_file_list:
print(f" {file['file_name']}")
print()
save_file_return = self.save_file(
fid_list, fid_token_list, to_pdir_fid, pwd_id, stoken
)
@ -783,12 +1098,14 @@ class Quark:
query_task_return = self.query_task(task_id)
if query_task_return["code"] == 0:
# 建立目录树
saved_files = []
for index, item in enumerate(need_save_list):
icon = (
"📁"
if item["dir"] == True
else "🎞️" if item["obj_category"] == "video" else ""
)
saved_files.append(f"{icon}{item['save_name']}")
tree.create_node(
f"{icon}{item['save_name']}",
item["fid"],
@ -799,15 +1116,180 @@ class Quark:
"is_dir": item["dir"],
},
)
# 添加成功通知
add_notify(f"✅《{task['taskname']}》 添加追更:\n/{task['savepath']}{subdir_path}")
# 打印保存文件列表
for idx, file_name in enumerate(saved_files):
prefix = "├── " if idx < len(saved_files) - 1 else "└── "
add_notify(f"{prefix}{file_name}")
add_notify("")
else:
err_msg = query_task_return["message"]
else:
err_msg = save_file_return["message"]
if err_msg:
add_notify(f"❌《{task['taskname']}》转存失败:{err_msg}\n")
else:
# 没有新文件需要转存
if not subdir_path: # 只在顶层(非子目录)打印一次消息
pass
return tree
def do_rename_task(self, task, subdir_path=""):
# 检查是否为顺序命名模式
if task.get("use_sequence_naming") and task.get("sequence_naming"):
# 使用顺序命名模式
sequence_pattern = task["sequence_naming"]
# 替换占位符为正则表达式捕获组
regex_pattern = re.escape(sequence_pattern).replace('\\{\\}', '(\\d+)')
savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")
if not self.savepath_fid.get(savepath):
# 路径已存在直接设置fid
self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"]
dir_file_list = self.ls_dir(self.savepath_fid[savepath])
dir_file_name_list = [item["file_name"] for item in dir_file_list]
# 找出当前最大序号
max_sequence = 0
for dir_file in dir_file_list:
matches = re.match(regex_pattern, dir_file["file_name"])
if matches:
try:
current_seq = int(matches.group(1))
max_sequence = max(max_sequence, current_seq)
except (IndexError, ValueError):
pass
# 重命名文件
current_sequence = max_sequence
is_rename_count = 0
# 定义自定义排序函数
def custom_sort(file):
file_name = file["file_name"]
# 1. 提取文件名中的数字(期数/集数等)
episode_num = 0
# 尝试匹配"第X期/集/话"格式
episode_match = re.search(r'第(\d+)[期集话]', file_name)
if episode_match:
episode_num = int(episode_match.group(1))
# 如果同时存在集数和上中下,则按照集数*10+位置排序
if '' in file_name:
return (episode_num, 1, file.get("created_at", 0))
elif '' in file_name:
return (episode_num, 2, file.get("created_at", 0))
elif '' in file_name:
return (episode_num, 3, file.get("created_at", 0))
return (episode_num, 10, file.get("created_at", 0))
# 如果文件名中包含"上中下",优先处理
if '' in file_name:
return (0, 1, file.get("created_at", 0))
elif '' in file_name:
return (0, 2, file.get("created_at", 0))
elif '' in file_name:
return (0, 3, file.get("created_at", 0))
# 尝试匹配常见视频格式 S01E01, E01, 1x01 等
if re.search(r'[Ss](\d+)[Ee](\d+)', file_name):
match = re.search(r'[Ss](\d+)[Ee](\d+)', file_name)
season = int(match.group(1))
episode = int(match.group(2))
episode_num = season * 1000 + episode # 确保季和集的排序正确
elif re.search(r'[Ee][Pp]?(\d+)', file_name):
match = re.search(r'[Ee][Pp]?(\d+)', file_name)
episode_num = int(match.group(1))
elif re.search(r'(\d+)[xX](\d+)', file_name):
match = re.search(r'(\d+)[xX](\d+)', file_name)
season = int(match.group(1))
episode = int(match.group(2))
episode_num = season * 1000 + episode
# 3. 日期格式识别(支持多种格式)
# 3.1 完整的YYYYMMDD格式
match_date_compact = re.search(r'(20\d{2})(\d{2})(\d{2})', file_name)
if match_date_compact:
year = int(match_date_compact.group(1))
month = int(match_date_compact.group(2))
day = int(match_date_compact.group(3))
return (year * 10000 + month * 100 + day, 0, file.get("created_at", 0))
# 3.2 YYYY-MM-DD 或 YYYY.MM.DD 或 YYYY/MM/DD 格式
match_date_full = re.search(r'(20\d{2})[-./](\d{1,2})[-./](\d{1,2})', file_name)
if match_date_full:
year = int(match_date_full.group(1))
month = int(match_date_full.group(2))
day = int(match_date_full.group(3))
return (year * 10000 + month * 100 + day, 0, file.get("created_at", 0))
# 3.3 MM/DD/YYYY 或 DD/MM/YYYY 格式
match_date_alt = re.search(r'(\d{1,2})[-./](\d{1,2})[-./](20\d{2})', file_name)
if match_date_alt:
# 假设第一个是月,第二个是日(美式日期)
month = int(match_date_alt.group(1))
day = int(match_date_alt.group(2))
year = int(match_date_alt.group(3))
# 检查月份值如果大于12可能是欧式日期格式DD/MM/YYYY
if month > 12:
month, day = day, month
return (year * 10000 + month * 100 + day, 0, file.get("created_at", 0))
# 3.4 MM/DD 格式(无年份)
match_date_short = re.search(r'(\d{1,2})[-./](\d{1,2})', file_name)
if match_date_short:
# 假设第一个是月,第二个是日
month = int(match_date_short.group(1))
day = int(match_date_short.group(2))
# 检查月份值如果大于12可能是欧式日期格式DD/MM
if month > 12:
month, day = day, month
return (month * 100 + day, 0, file.get("created_at", 0))
# 3.5 年期格式,如"2025年14期"
match_year_issue = re.search(r'(20\d{2})[年].*?(\d+)[期]', file_name)
if match_year_issue:
year = int(match_year_issue.group(1))
issue = int(match_year_issue.group(2))
return (year * 1000 + issue, 0, file.get("created_at", 0))
# 默认使用数字排序或创建时间
match_num = re.match(r'^(\d+)', os.path.splitext(file_name)[0])
if match_num:
return (int(match_num.group(1)), 0, file.get("created_at", 0))
# 最后按创建时间排序
return (0, 0, file.get("created_at", 0))
# 按自定义逻辑排序
sorted_files = sorted([f for f in dir_file_list if not f["dir"] and not re.match(regex_pattern, f["file_name"])], key=custom_sort)
for dir_file in sorted_files:
current_sequence += 1
file_ext = os.path.splitext(dir_file["file_name"])[1]
save_name = sequence_pattern.replace("{}", f"{current_sequence:02d}") + file_ext
if save_name != dir_file["file_name"] and save_name not in dir_file_name_list:
try:
rename_return = self.rename(dir_file["fid"], save_name)
# 防止网络问题导致的错误
if isinstance(rename_return, dict) and rename_return.get("code") == 0:
print(f"重命名: {dir_file['file_name']}{save_name}")
is_rename_count += 1
dir_file_name_list.append(save_name)
else:
error_msg = rename_return.get("message", "未知错误")
print(f"重命名: {dir_file['file_name']}{save_name} 失败,{error_msg}")
except Exception as e:
print(f"重命名出错: {dir_file['file_name']}{save_name},错误:{str(e)}")
return is_rename_count > 0
# 非顺序命名模式,使用普通正则重命名
pattern, replace = self.magic_regex_func(
task.get("pattern", ""), task.get("replace", ""), task["taskname"]
)
@ -815,6 +1297,7 @@ class Quark:
return 0
savepath = re.sub(r"/{2,}", "/", f"/{task['savepath']}{subdir_path}")
if not self.savepath_fid.get(savepath):
# 路径已存在直接设置fid
self.savepath_fid[savepath] = self.get_fids([savepath])[0]["fid"]
dir_file_list = self.ls_dir(self.savepath_fid[savepath])
dir_file_name_list = [item["file_name"] for item in dir_file_list]
@ -835,11 +1318,11 @@ class Quark:
):
rename_return = self.rename(dir_file["fid"], save_name)
if rename_return["code"] == 0:
print(f"重命名{dir_file['file_name']}{save_name}")
print(f"重命名: {dir_file['file_name']}{save_name}")
is_rename_count += 1
else:
print(
f"重命名{dir_file['file_name']}{save_name} 失败,{rename_return['message']}"
f"重命名: {dir_file['file_name']}{save_name} 失败,{rename_return['message']}"
)
return is_rename_count > 0
@ -932,10 +1415,15 @@ def do_save(account, tasklist=[]):
print(f"任务名称: {task['taskname']}")
print(f"分享链接: {task['shareurl']}")
print(f"保存路径: {task['savepath']}")
if task.get("pattern"):
print(f"正则匹配: {task['pattern']}")
if task.get("replace"):
print(f"正则替换: {task['replace']}")
# 根据命名模式显示不同信息
if task.get("use_sequence_naming") and task.get("sequence_naming"):
print(f"顺序命名: {task['sequence_naming']}")
else:
# 正则命名模式
if task.get("pattern"):
print(f"正则匹配: {task['pattern']}")
if task.get("replace") is not None: # 显示替换规则,即使为空字符串
print(f"正则替换: {task['replace']}")
if task.get("update_subdir"):
print(f"更子目录: {task['update_subdir']}")
if task.get("runweek") or task.get("enddate"):
@ -975,6 +1463,9 @@ def do_save(account, tasklist=[]):
task = (
plugin.run(task, account=account, tree=is_new_tree) or task
)
elif is_new_tree is False: # 明确没有新文件
print(f"任务完成:没有新的文件需要转存")
print()
print()