yanyuyijian/Scripts/py/deleteDuplicateTasksImplement.py
2021-08-24 22:33:24 +08:00

113 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'''
cron: 20 0-23/2 * * *
new Env('清理重复任务');
'''
# -*- coding:utf-8 -*-
import json
import os,sys
import requests
import time
ip="localhost"
def loadSend():
print("加载推送功能")
global send
cur_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(cur_path)
if os.path.exists(cur_path + "/deleteDuplicateTasksNotify.py"):
try:
from deleteDuplicateTasksNotify import send
except:
print("加载通知服务失败~")
headers={
"Accept": "application/json",
"Authorization": "Basic YWRtaW46YWRtaW4=",
"User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36",
}
def getTaskList():
t = round(time.time() * 1000)
url = "http://%s:5700/api/crons?searchValue=&t=%d" % (ip, t)
response = requests.get(url=url, headers=headers)
responseContent=json.loads(response.content.decode('utf-8'))
if responseContent['code']==200:
taskList= responseContent['data']
return taskList
else:
# 没有获取到taskList返回空
return []
def getDuplicate(taskList):
wholeNames={}
duplicateID=[]
for task in taskList:
if task['name'] in wholeNames.keys():
duplicateID.append(task['_id'])
else:
wholeNames[task['name']] = 1
return duplicateID
def getData(duplicateID):
rawData = "["
count=0
for id in duplicateID:
rawData += "\"%s\""%id
if count<len(duplicateID)-1:
rawData += ", "
count+=1
rawData += "]"
return rawData
def deleteDuplicateTasks(duplicateID):
t = round(time.time() * 1000)
url = "http://%s:5700/api/crons?t=%d" % (ip, t)
data=json.dumps(duplicateID)
headers["Content-Type"]="application/json;charset=UTF-8"
response=requests.delete(url=url,headers=headers,data=data)
msg = json.loads(response.content.decode('utf-8'))
if msg['code']!=200:
print("出错!,错误信息为:%s"%msg)
else:
print("成功删除重复任务")
def loadToken():
# cur_path = os.path.abspath(os.path.dirname(__file__))
# send("当前路径:",cur_path)
try:
with open("/ql/config/auth.json","r",encoding="utf-8") as f:
data=json.load(f)
except:
# pass
send("无法获取token","")
return data['token']
if __name__ == '__main__':
print("开始!")
loadSend()
# 直接从 /ql/config/auth.json中读取当前token
token=loadToken()
# send("成功获取token!","")
headers["Authorization"] = "Bearer %s"%token
taskList=getTaskList()
# 如果仍旧是空的,则报警
if len(taskList)==0:
print("无法获取taskList!")
duplicateID=getDuplicate(taskList)
before="清除前数量为:%d"%len(taskList)
print(before)
after="清除重复任务后,数量为:%d"%(len(taskList)-len(duplicateID))
print(after)
if len(duplicateID)==0:
print("没有重复任务")
else:
deleteDuplicateTasks(duplicateID)
send("清理成功","\n%s\n%s"%(before,after))
# print("清理结束!")