From d2085e9213ff71c7219b4e851b8cce99a25db4c4 Mon Sep 17 00:00:00 2001 From: zhao-zg <61549925+zhao-zg@users.noreply.github.com> Date: Wed, 31 Jul 2024 10:39:17 +0800 Subject: [PATCH] Add files via upload --- api.py | 7 +-- login.py | 132 ++++++++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 111 insertions(+), 28 deletions(-) diff --git a/api.py b/api.py index 40dbc64..5b0f28c 100644 --- a/api.py +++ b/api.py @@ -10,6 +10,7 @@ import login as backend import ddddocr ocr = ddddocr.DdddOcr(show_ad=False, beta=True) +ocrDet = ddddocr.DdddOcr(show_ad=False, beta=True, det=True) class account: @@ -90,9 +91,9 @@ async def loginNew(): # 调用后端进行登录 -async def THREAD_DO_LOGIN(workList, uid, ocr): +async def THREAD_DO_LOGIN(workList, uid, ocr, ocrDet): try: - await backend.main(workList, uid, ocr) + await backend.main(workList, uid, ocr, ocrDet) except Exception as e: print(e) workList[uid].msg = str(e) @@ -170,7 +171,7 @@ def loginPublic(data): # 新增记录 workList[u.uid] = u # 非阻塞启动登录线程 - asyncio.create_task(THREAD_DO_LOGIN(workList, u.uid, ocr)) + asyncio.create_task(THREAD_DO_LOGIN(workList, u.uid, ocr, ocrDet)) # 更新信息,响应api请求 workList[u.uid].status = "pending" r = mr("pass", uid=u.uid, msg=f"{u.account}处理中, 到/check查询处理进度") diff --git a/login.py b/login.py index c2bf5c5..ccb108e 100644 --- a/login.py +++ b/login.py @@ -19,6 +19,8 @@ import io import re # 传参获得已初始化的ddddocr实例 ocr = None +ocrDet = None + # 支持的形状类型 supported_types = [ "三角形", @@ -330,11 +332,12 @@ async def loginPassword(chromium_path, workList, uid, headless): elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'): print("进入点形状、颜色验证分支") - workList[uid].status = "pending" - workList[uid].msg = "正在过形状、颜色检测" - await verification_shape(page) - await page.waitFor(3000) - continue + workList[uid].status = "pending" + workList[uid].msg = "正在过形状、颜色检测" + if await verification_shape(page) == "notSupport": + print("即将重启浏览器重试") + await browser.close() + return "notSupport" if not sms_sent: if await page.J(".sub-title"): @@ -343,7 +346,10 @@ async def loginPassword(chromium_path, workList, uid, headless): workList[uid].status = "SMS" workList[uid].msg = "需要短信验证" - await sendSMS(page) + if await sendSMS(page) == "notSupport": + print("即将重启浏览器重试") + await browser.close() + return "notSupport" await page.waitFor(3000) await typeSMScode(page, workList, uid) sms_sent = True @@ -359,7 +365,10 @@ async def loginPassword(chromium_path, workList, uid, headless): if not workList[uid].isAuto: workList[uid].status = "SMS" workList[uid].msg = "需要短信验证" - await sendSMSDirectly(page) + if await sendSMSDirectly(page) == "notSupport": + print("即将重启浏览器重试") + await browser.close() + return "notSupport" await page.waitFor(3000) await typeSMScode(page, workList, uid) sms_sent = True @@ -449,7 +458,8 @@ async def sendSMSDirectly(page): await verification(page) elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'): - await verification_shape(page) + if await verification_shape(page) == "notSupport": + return "notSupport" else: break @@ -490,7 +500,8 @@ async def sendSMS(page): await verification(page) elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'): - await verification_shape(page) + if await verification_shape(page) == "notSupport": + return "notSupport" else: break @@ -636,7 +647,7 @@ async def verification(page): box["x"] + distance + random.uniform(3, 15), box["y"], {"steps": 10} ) await page.waitFor( - random.randint(100, 400) + random.randint(100, 500) ) await page.mouse.move( box["x"] + distance, box["y"], {"steps": 10} @@ -744,6 +755,14 @@ async def verification_shape(page): else: raise "image is empty" + def get_gray_img(path): + img = cv2.imread(path) + gray = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)) + gray.save("gray.png") + return open("gray.png", "rb").read() + + # 文字点选的重试次数,超过将重启浏览器 + retry_count = 10 for i in range(5): await page.waitForSelector("div.captcha_footer img") image_src = await page.Jeval( @@ -798,8 +817,57 @@ async def verification_shape(page): print(f"不支持{target_color},重试") await refresh_button.click() await asyncio.sleep(random.uniform(2, 4)) + break + elif word.find("依次") > 0: + if retry_count < 1: + print("文字点选重试失败") + return "notSupport" + i = 3 + print("进入文字点选") + print(f"文字点选第{11 - retry_count}次尝试") + retry_count -= 1 + target_word = word.replace("\"", "")[-4:] + print(f"点选字为: {target_word}") + gray_img = get_gray_img("shape_image.png") + xy_list = ocrDet.detection(gray_img) + src_img = Image.open("shape_image.png") + words = [] + for row in xy_list: + [x1, y1, x2, y2] = row + corp = src_img.crop([x1 - 7 if x1 > 7 else x1, y1 - 7 if y1 > 7 else y1, x2 + 7, y2 + 7]) + # 识别出单个字 + result_word = ocr.classification(corp, png_fix=True) + words.append(result_word) + result = dict(zip(words, xy_list)) + print(f"result: {result}") + img_xy = {} + for key, xy in result.items(): + img_xy[key] = (int((xy[0] + xy[2]) / 2), int((xy[1] + xy[3]) / 2)) + not_found = False + click_points = {} + for wd in target_word: + if wd not in img_xy: + print(f"\"{wd}\"未找到,识别失败,刷新") + await refresh_button.click() + await asyncio.sleep(random.uniform(2, 4)) + not_found = True + break + center_x, center_y = img_xy[wd] + click_x, click_y = image_top_left_x + center_x, image_top_left_y + center_y + click_points[wd] = [click_x, click_y] + print(click_points) + if os.path.exists("gray.png"): + os.remove("gray.png") + if not_found: continue - + print("文字点选识别正常") + for wd, point in click_points.items(): + print(f"点击\"{wd}\",坐标{point[0]}:{point[1]}") + await page.mouse.click(point[0], point[1]) + await asyncio.sleep(random.uniform(0.5, 2)) + await button.click() + await asyncio.sleep(random.uniform(0.3, 1)) + break else: shape_type = word.split("请选出图中的")[1] if shape_type in supported_types: @@ -861,10 +929,10 @@ async def download_file(url, file_path): print("下载完成,进行解压安装....") - -async def main(workList, uid, oocr): - global ocr +async def main(workList, uid, oocr, oocrDet): + global ocr, ocrDet ocr = oocr + ocrDet = oocrDet async def init_chrome(): if platform.system() == "Windows": @@ -943,16 +1011,30 @@ async def main(workList, uid, oocr): chromium_path = await init_chrome() headless = 'new' print("选择登录") - if workList[uid].type == "phone": - print("选择手机号登录") - await loginPhone(chromium_path, workList, uid, headless) - elif workList[uid].type == "password": - print("选择密码登录") - await loginPassword(chromium_path, workList, uid, headless) - os.remove("image.png") if os.path.exists("image.png") else None - os.remove("template.png") if os.path.exists("template.png") else None - os.remove("shape_image.png") if os.path.exists("shape_image.png") else None - os.remove("rgba_word_img.png") if os.path.exists("rgba_word_img.png") else None - os.remove("rgb_word_img.png") if os.path.exists("rgb_word_img.png") else None + + try_time = 1 + while True: + if workList[uid].type == "phone": + print("选择手机号登录") + aresult = wait loginPhone(chromium_path, workList, uid, headless) + elif workList[uid].type == "password": + print("选择密码登录") + result = await loginPassword(chromium_path, workList, uid, headless) + if result != "notSupport" or try_time > 5: + break + await asyncio.sleep(random.uniform(2, 4)) + print(f"进行第{try_time}次重试") + try_time += 1 + if os.path.exists("image.png"): + os.remove("image.png") + if os.path.exists("template.png"): + os.remove("template.png") + if os.path.exists("shape_image.png"): + os.remove("shape_image.png") + if os.path.exists("rgba_word_img.png"): + os.remove("rgba_word_img.png") + if os.path.exists("rgb_word_img.png"): + os.remove("rgb_word_img.png") + print("登录完成") await asyncio.sleep(10)