Add files via upload

This commit is contained in:
zhao-zg 2024-07-31 10:39:17 +08:00 committed by GitHub
parent 2e3bfa56d8
commit d2085e9213
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 111 additions and 28 deletions

7
api.py
View File

@ -10,6 +10,7 @@ import login as backend
import ddddocr import ddddocr
ocr = ddddocr.DdddOcr(show_ad=False, beta=True) ocr = ddddocr.DdddOcr(show_ad=False, beta=True)
ocrDet = ddddocr.DdddOcr(show_ad=False, beta=True, det=True)
class account: class account:
@ -90,9 +91,9 @@ async def loginNew():
# 调用后端进行登录 # 调用后端进行登录
async def THREAD_DO_LOGIN(workList, uid, ocr): async def THREAD_DO_LOGIN(workList, uid, ocr, ocrDet):
try: try:
await backend.main(workList, uid, ocr) await backend.main(workList, uid, ocr, ocrDet)
except Exception as e: except Exception as e:
print(e) print(e)
workList[uid].msg = str(e) workList[uid].msg = str(e)
@ -170,7 +171,7 @@ def loginPublic(data):
# 新增记录 # 新增记录
workList[u.uid] = u workList[u.uid] = u
# 非阻塞启动登录线程 # 非阻塞启动登录线程
asyncio.create_task(THREAD_DO_LOGIN(workList, u.uid, ocr)) asyncio.create_task(THREAD_DO_LOGIN(workList, u.uid, ocr, ocrDet))
# 更新信息响应api请求 # 更新信息响应api请求
workList[u.uid].status = "pending" workList[u.uid].status = "pending"
r = mr("pass", uid=u.uid, msg=f"{u.account}处理中, 到/check查询处理进度") r = mr("pass", uid=u.uid, msg=f"{u.account}处理中, 到/check查询处理进度")

132
login.py
View File

@ -19,6 +19,8 @@ import io
import re import re
# 传参获得已初始化的ddddocr实例 # 传参获得已初始化的ddddocr实例
ocr = None ocr = None
ocrDet = None
# 支持的形状类型 # 支持的形状类型
supported_types = [ supported_types = [
"三角形", "三角形",
@ -330,11 +332,12 @@ async def loginPassword(chromium_path, workList, uid, headless):
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'): elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
print("进入点形状、颜色验证分支") print("进入点形状、颜色验证分支")
workList[uid].status = "pending" workList[uid].status = "pending"
workList[uid].msg = "正在过形状、颜色检测" workList[uid].msg = "正在过形状、颜色检测"
await verification_shape(page) if await verification_shape(page) == "notSupport":
await page.waitFor(3000) print("即将重启浏览器重试")
continue await browser.close()
return "notSupport"
if not sms_sent: if not sms_sent:
if await page.J(".sub-title"): if await page.J(".sub-title"):
@ -343,7 +346,10 @@ async def loginPassword(chromium_path, workList, uid, headless):
workList[uid].status = "SMS" workList[uid].status = "SMS"
workList[uid].msg = "需要短信验证" workList[uid].msg = "需要短信验证"
await sendSMS(page) if await sendSMS(page) == "notSupport":
print("即将重启浏览器重试")
await browser.close()
return "notSupport"
await page.waitFor(3000) await page.waitFor(3000)
await typeSMScode(page, workList, uid) await typeSMScode(page, workList, uid)
sms_sent = True sms_sent = True
@ -359,7 +365,10 @@ async def loginPassword(chromium_path, workList, uid, headless):
if not workList[uid].isAuto: if not workList[uid].isAuto:
workList[uid].status = "SMS" workList[uid].status = "SMS"
workList[uid].msg = "需要短信验证" workList[uid].msg = "需要短信验证"
await sendSMSDirectly(page) if await sendSMSDirectly(page) == "notSupport":
print("即将重启浏览器重试")
await browser.close()
return "notSupport"
await page.waitFor(3000) await page.waitFor(3000)
await typeSMScode(page, workList, uid) await typeSMScode(page, workList, uid)
sms_sent = True sms_sent = True
@ -449,7 +458,8 @@ async def sendSMSDirectly(page):
await verification(page) await verification(page)
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'): elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
await verification_shape(page) if await verification_shape(page) == "notSupport":
return "notSupport"
else: else:
break break
@ -490,7 +500,8 @@ async def sendSMS(page):
await verification(page) await verification(page)
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'): elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
await verification_shape(page) if await verification_shape(page) == "notSupport":
return "notSupport"
else: else:
break break
@ -636,7 +647,7 @@ async def verification(page):
box["x"] + distance + random.uniform(3, 15), box["y"], {"steps": 10} box["x"] + distance + random.uniform(3, 15), box["y"], {"steps": 10}
) )
await page.waitFor( await page.waitFor(
random.randint(100, 400) random.randint(100, 500)
) )
await page.mouse.move( await page.mouse.move(
box["x"] + distance, box["y"], {"steps": 10} box["x"] + distance, box["y"], {"steps": 10}
@ -744,6 +755,14 @@ async def verification_shape(page):
else: else:
raise "image is empty" raise "image is empty"
def get_gray_img(path):
img = cv2.imread(path)
gray = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY))
gray.save("gray.png")
return open("gray.png", "rb").read()
# 文字点选的重试次数,超过将重启浏览器
retry_count = 10
for i in range(5): for i in range(5):
await page.waitForSelector("div.captcha_footer img") await page.waitForSelector("div.captcha_footer img")
image_src = await page.Jeval( image_src = await page.Jeval(
@ -798,8 +817,57 @@ async def verification_shape(page):
print(f"不支持{target_color},重试") print(f"不支持{target_color},重试")
await refresh_button.click() await refresh_button.click()
await asyncio.sleep(random.uniform(2, 4)) await asyncio.sleep(random.uniform(2, 4))
break
elif word.find("依次") > 0:
if retry_count < 1:
print("文字点选重试失败")
return "notSupport"
i = 3
print("进入文字点选")
print(f"文字点选第{11 - retry_count}次尝试")
retry_count -= 1
target_word = word.replace("\"", "")[-4:]
print(f"点选字为: {target_word}")
gray_img = get_gray_img("shape_image.png")
xy_list = ocrDet.detection(gray_img)
src_img = Image.open("shape_image.png")
words = []
for row in xy_list:
[x1, y1, x2, y2] = row
corp = src_img.crop([x1 - 7 if x1 > 7 else x1, y1 - 7 if y1 > 7 else y1, x2 + 7, y2 + 7])
# 识别出单个字
result_word = ocr.classification(corp, png_fix=True)
words.append(result_word)
result = dict(zip(words, xy_list))
print(f"result: {result}")
img_xy = {}
for key, xy in result.items():
img_xy[key] = (int((xy[0] + xy[2]) / 2), int((xy[1] + xy[3]) / 2))
not_found = False
click_points = {}
for wd in target_word:
if wd not in img_xy:
print(f"\"{wd}\"未找到,识别失败,刷新")
await refresh_button.click()
await asyncio.sleep(random.uniform(2, 4))
not_found = True
break
center_x, center_y = img_xy[wd]
click_x, click_y = image_top_left_x + center_x, image_top_left_y + center_y
click_points[wd] = [click_x, click_y]
print(click_points)
if os.path.exists("gray.png"):
os.remove("gray.png")
if not_found:
continue continue
print("文字点选识别正常")
for wd, point in click_points.items():
print(f"点击\"{wd}\",坐标{point[0]}:{point[1]}")
await page.mouse.click(point[0], point[1])
await asyncio.sleep(random.uniform(0.5, 2))
await button.click()
await asyncio.sleep(random.uniform(0.3, 1))
break
else: else:
shape_type = word.split("请选出图中的")[1] shape_type = word.split("请选出图中的")[1]
if shape_type in supported_types: if shape_type in supported_types:
@ -861,10 +929,10 @@ async def download_file(url, file_path):
print("下载完成,进行解压安装....") print("下载完成,进行解压安装....")
async def main(workList, uid, oocr, oocrDet):
async def main(workList, uid, oocr): global ocr, ocrDet
global ocr
ocr = oocr ocr = oocr
ocrDet = oocrDet
async def init_chrome(): async def init_chrome():
if platform.system() == "Windows": if platform.system() == "Windows":
@ -943,16 +1011,30 @@ async def main(workList, uid, oocr):
chromium_path = await init_chrome() chromium_path = await init_chrome()
headless = 'new' headless = 'new'
print("选择登录") print("选择登录")
if workList[uid].type == "phone":
print("选择手机号登录") try_time = 1
await loginPhone(chromium_path, workList, uid, headless) while True:
elif workList[uid].type == "password": if workList[uid].type == "phone":
print("选择密码登录") print("选择手机号登录")
await loginPassword(chromium_path, workList, uid, headless) aresult = wait loginPhone(chromium_path, workList, uid, headless)
os.remove("image.png") if os.path.exists("image.png") else None elif workList[uid].type == "password":
os.remove("template.png") if os.path.exists("template.png") else None print("选择密码登录")
os.remove("shape_image.png") if os.path.exists("shape_image.png") else None result = await loginPassword(chromium_path, workList, uid, headless)
os.remove("rgba_word_img.png") if os.path.exists("rgba_word_img.png") else None if result != "notSupport" or try_time > 5:
os.remove("rgb_word_img.png") if os.path.exists("rgb_word_img.png") else None break
await asyncio.sleep(random.uniform(2, 4))
print(f"进行第{try_time}次重试")
try_time += 1
if os.path.exists("image.png"):
os.remove("image.png")
if os.path.exists("template.png"):
os.remove("template.png")
if os.path.exists("shape_image.png"):
os.remove("shape_image.png")
if os.path.exists("rgba_word_img.png"):
os.remove("rgba_word_img.png")
if os.path.exists("rgb_word_img.png"):
os.remove("rgb_word_img.png")
print("登录完成") print("登录完成")
await asyncio.sleep(10) await asyncio.sleep(10)