mirror of
https://github.com/zhao-zg/jd-login.git
synced 2026-01-13 14:30:43 +08:00
Add files via upload
This commit is contained in:
parent
464b3cd586
commit
0088daa4c4
221
api.py
Normal file
221
api.py
Normal file
@ -0,0 +1,221 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# api.py
|
||||
run_host = "0.0.0.0"
|
||||
run_port = 12345
|
||||
|
||||
|
||||
from quart import Quart, request, jsonify
|
||||
import hashlib, asyncio
|
||||
import login as backend
|
||||
import ddddocr
|
||||
|
||||
ocr = ddddocr.DdddOcr(show_ad=False, beta=True)
|
||||
|
||||
|
||||
class account:
|
||||
status = ""
|
||||
uid = ""
|
||||
account = ""
|
||||
password = ""
|
||||
isAuto = False
|
||||
type = ""
|
||||
cookie = ""
|
||||
SMS_CODE = ""
|
||||
msg = ""
|
||||
|
||||
def __init__(self, data):
|
||||
try:
|
||||
self.status = "pending"
|
||||
self.account = data.get("id", None)
|
||||
self.type = data.get("type", None)
|
||||
self.password = data.get("pw", None)
|
||||
self.isAuto = data.get("isAuto", False)
|
||||
if not self.account:
|
||||
raise ValueError("账号不能为空")
|
||||
if type == "password" and not self.password:
|
||||
raise ValueError("密码不能为空")
|
||||
|
||||
c = str(self.account) + str(self.password)
|
||||
self.uid = hashlib.sha256(c.encode("utf-8")).hexdigest()
|
||||
except:
|
||||
raise ValueError("账号密码错误:" + str(data))
|
||||
|
||||
|
||||
# 正在处理的账号列表
|
||||
workList = {}
|
||||
"""
|
||||
(global) workList ={
|
||||
uid: {
|
||||
status: pending,
|
||||
account: 138xxxxxxxx,
|
||||
password: admin123,
|
||||
isAuto: False
|
||||
cookie: ""
|
||||
SMS_CODE: None,
|
||||
msg: "Error Info"
|
||||
},
|
||||
...
|
||||
}
|
||||
"""
|
||||
app = Quart(__name__)
|
||||
|
||||
|
||||
def mr(status, **kwargs):
|
||||
r_data = {}
|
||||
r_data["status"] = status
|
||||
for key, value in kwargs.items():
|
||||
r_data[str(key)] = value
|
||||
r_data = jsonify(r_data)
|
||||
r_data.headers["Content-Type"] = "application/json; charset=utf-8"
|
||||
return r_data
|
||||
|
||||
|
||||
# -----router-----
|
||||
# 传入账号密码,启动登录线程
|
||||
@app.route("/loginPhone", methods=["POST"])
|
||||
async def loginPhone():
|
||||
print("loginPhone")
|
||||
data = await request.get_json()
|
||||
try:
|
||||
u = account(data)
|
||||
except Exception as e:
|
||||
r = mr("error", msg=str(e))
|
||||
return r
|
||||
# 检测重复提交
|
||||
if workList.get(u.uid):
|
||||
workList[u.uid].SMS_CODE = None
|
||||
r = mr("pass", uid=u.uid, msg=f"{u.account}已经在处理了,请稍后再试")
|
||||
return r
|
||||
|
||||
# 新增记录
|
||||
workList[u.uid] = u
|
||||
# 非阻塞启动登录线程
|
||||
asyncio.create_task(THREAD_DO_LOGIN(workList, u.uid, ocr))
|
||||
# 更新信息,响应api请求
|
||||
workList[u.uid].status = "pending"
|
||||
r = mr("pass", uid=u.uid, msg=f"{u.account}处理中, 到/check查询处理进度")
|
||||
return r
|
||||
# -----router-----
|
||||
# 传入账号密码,启动登录线程
|
||||
@app.route("/loginPassword", methods=["POST"])
|
||||
async def loginPassword():
|
||||
data = await request.get_json()
|
||||
try:
|
||||
u = account(data)
|
||||
except Exception as e:
|
||||
r = mr("error", msg=str(e))
|
||||
return r
|
||||
# 检测重复提交
|
||||
if workList.get(u.uid):
|
||||
workList[u.uid].SMS_CODE = None
|
||||
r = mr("pass", uid=u.uid, msg=f"{u.account}已经在处理了,请稍后再试")
|
||||
return r
|
||||
|
||||
# 新增记录
|
||||
workList[u.uid] = u
|
||||
# 非阻塞启动登录线程
|
||||
asyncio.create_task(THREAD_DO_LOGIN(workList, u.uid, ocr))
|
||||
# 更新信息,响应api请求
|
||||
workList[u.uid].status = "pending"
|
||||
r = mr("pass", uid=u.uid, msg=f"{u.account}处理中, 到/check查询处理进度")
|
||||
return r
|
||||
|
||||
|
||||
# 调用后端进行登录
|
||||
async def THREAD_DO_LOGIN(workList, uid, ocr):
|
||||
try:
|
||||
await backend.main(workList, uid, ocr)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
workList[uid].msg = str(e)
|
||||
|
||||
"""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(backend.start(workList, uid))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
workList[uid].msg = str(e)
|
||||
"""
|
||||
|
||||
|
||||
# 检查后端进度记录
|
||||
@app.route("/check", methods=["POST"])
|
||||
async def check():
|
||||
data = await request.get_json()
|
||||
uid = data.get("uid", None)
|
||||
r = None
|
||||
# 账号列表有记录
|
||||
if workList.get(uid, ""):
|
||||
status = workList[uid].status
|
||||
if status == "pass":
|
||||
cookie = workList[uid].cookie
|
||||
r = mr(status, cookie=cookie, msg="成功")
|
||||
elif status == "pending":
|
||||
r = mr(status, msg="正在处理中,请等待")
|
||||
elif status == "error":
|
||||
r = mr(status, msg="登录失败,请在十秒后重试:" + workList[uid].msg)
|
||||
elif status == "SMS":
|
||||
r = mr(status, msg="需要短信验证")
|
||||
elif status == "wrongSMS":
|
||||
r = mr(status, msg="短信验证错误,请重新输入")
|
||||
else:
|
||||
r = mr("error", msg="笨蛋开发者,忘记适配新状态啦:" + status)
|
||||
# 账号列表无记录
|
||||
else:
|
||||
r = mr("error", msg="未找到该账号记录,请重新登录")
|
||||
return r
|
||||
|
||||
|
||||
# 传入短信验证码,更新账号列表使后端可以调用
|
||||
@app.route("/sms", methods=["POST"])
|
||||
async def sms():
|
||||
data = await request.get_json()
|
||||
uid = data.get("uid", None)
|
||||
code = data.get("code", None)
|
||||
# 检查传入验证码合规
|
||||
if len(code) != 6 and not code.isdigit():
|
||||
r = mr("wrongSMS", msg="验证码错误")
|
||||
return r
|
||||
try:
|
||||
THREAD_SMS(uid, code)
|
||||
r = mr("pass", msg="成功提交验证码")
|
||||
return r
|
||||
except Exception as e:
|
||||
r = mr("error", msg=str(e))
|
||||
return r
|
||||
|
||||
|
||||
def THREAD_SMS(uid, code):
|
||||
print("phase THREAD_SMS: " + str(code))
|
||||
u = workList.get(uid, "")
|
||||
if not u:
|
||||
raise ValueError("账号不在记录中")
|
||||
if u.status == "SMS" or u.status == "wrongSMS":
|
||||
u.SMS_CODE = code
|
||||
else:
|
||||
raise ValueError("账号不在SMS过程中")
|
||||
|
||||
|
||||
# -----regular functions-----
|
||||
# 删除成功或失败的账号记录
|
||||
async def deleteSession(uid):
|
||||
await asyncio.sleep(5)
|
||||
del workList[uid]
|
||||
|
||||
|
||||
"""
|
||||
@app.route("/delck", methods=["POST"])
|
||||
def delck():
|
||||
data = request.get_json()
|
||||
uid = data.get("uid", None)
|
||||
if not exist(uid):
|
||||
r = mr(False, msg="not exist")
|
||||
return r
|
||||
|
||||
THREAD_DELCK(uid)
|
||||
"""
|
||||
# 创建本线程的事件循环,运行flask作为第一个任务
|
||||
asyncio.new_event_loop().run_until_complete(app.run(host=run_host, port=run_port))
|
||||
956
login.py
Normal file
956
login.py
Normal file
@ -0,0 +1,956 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# login.py
|
||||
# author: github/svjdck & github.com/icepage/AutoUpdateJdCookie & 小九九 t.me/gdot0 & zhaozg
|
||||
|
||||
import os
|
||||
from pyppeteer import launch
|
||||
import aiohttp
|
||||
from urllib import request
|
||||
from PIL import Image
|
||||
import platform
|
||||
import zipfile
|
||||
import datetime
|
||||
import asyncio
|
||||
import random
|
||||
import cv2
|
||||
import numpy as np
|
||||
import base64
|
||||
import io
|
||||
import re
|
||||
# 传参获得已初始化的ddddocr实例
|
||||
ocr = None
|
||||
# 支持的形状类型
|
||||
supported_types = [
|
||||
"三角形",
|
||||
"正方形",
|
||||
"长方形",
|
||||
"五角星",
|
||||
"六边形",
|
||||
"圆形",
|
||||
"梯形",
|
||||
"圆环",
|
||||
]
|
||||
# 定义了支持的每种颜色的 HSV 范围
|
||||
supported_colors = {
|
||||
"紫色": ([125, 50, 50], [145, 255, 255]),
|
||||
"灰色": ([0, 0, 50], [180, 50, 255]),
|
||||
"粉色": ([160, 50, 50], [180, 255, 255]),
|
||||
"蓝色": ([100, 50, 50], [130, 255, 255]),
|
||||
"绿色": ([40, 50, 50], [80, 255, 255]),
|
||||
"橙色": ([10, 50, 50], [25, 255, 255]),
|
||||
"黄色": ([25, 50, 50], [35, 255, 255]),
|
||||
"红色": ([0, 50, 50], [10, 255, 255]),
|
||||
}
|
||||
|
||||
|
||||
async def deleteSession(workList, uid):
|
||||
s = workList.get(uid, "")
|
||||
if s:
|
||||
await asyncio.sleep(60)
|
||||
del workList[uid]
|
||||
|
||||
async def loginPhone(chromium_path, workList, uid, headless):
|
||||
# 判断账号密码错误
|
||||
async def isWrongAccountOrPassword(page, verify=False):
|
||||
try:
|
||||
element = await page.xpath('//*[@id="app"]/div/div[5]')
|
||||
if element:
|
||||
text = await page.evaluate(
|
||||
"(element) => element.textContent", element[0]
|
||||
)
|
||||
if text == "账号或密码不正确":
|
||||
if verify == True:
|
||||
return True
|
||||
await asyncio.sleep(2)
|
||||
return await isWrongAccountOrPassword(page, verify=True)
|
||||
return False
|
||||
except Exception as e:
|
||||
print("isWrongAccountOrPassword " + str(e))
|
||||
return False
|
||||
|
||||
# 判断验证码超时
|
||||
async def isStillInSMSCodeSentPage(page):
|
||||
try:
|
||||
if await page.querySelector('.getMsg-btn.text-btn.timer'):
|
||||
return False
|
||||
return await page.querySelector('#authcode')
|
||||
except Exception as e:
|
||||
print("isStillInSMSCodeSentPage " + str(e))
|
||||
return False
|
||||
|
||||
# 判断验证码错误
|
||||
async def needResendSMSCode(page):
|
||||
try:
|
||||
return await page.querySelector('.getMsg-btn.text-btn.timer');
|
||||
except Exception as e:
|
||||
print("needResendSMSCode " + str(e))
|
||||
return False
|
||||
|
||||
async def isSendSMSDirectly(page):
|
||||
try:
|
||||
title = await page.title()
|
||||
if title in ['手机语音验证', '手机短信验证']:
|
||||
print('需要' + title)
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print("isSendSMSDirectly " + str(e))
|
||||
return False
|
||||
|
||||
usernum = workList[uid].account
|
||||
|
||||
print(f"正在登录 {usernum} 的账号")
|
||||
|
||||
browser = await launch(
|
||||
{
|
||||
"executablePath": chromium_path,
|
||||
"headless": headless,
|
||||
"args": (
|
||||
"--no-sandbox",
|
||||
"--disable-setuid-sandbox",
|
||||
"--disable-dev-shm-usage",
|
||||
"--disable-gpu",
|
||||
"--disable-software-rasterizer",
|
||||
),
|
||||
}
|
||||
)
|
||||
page = await browser.newPage()
|
||||
await page.setViewport({"width": 360, "height": 640})
|
||||
await page.goto(
|
||||
"https://plogin.m.jd.com/login/login"
|
||||
)
|
||||
await typephoneuser(page, usernum)
|
||||
|
||||
IN_SMS_TIMES = 0
|
||||
start_time = datetime.datetime.now()
|
||||
sms_sent = False
|
||||
while True:
|
||||
try:
|
||||
now_time = datetime.datetime.now()
|
||||
print("循环检测中...")
|
||||
if (now_time - start_time).total_seconds() > 90:
|
||||
print("进入超时分支")
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "登录超时"
|
||||
break
|
||||
|
||||
elif await page.J("#searchWrapper"):
|
||||
print("进入成功获取cookie分支")
|
||||
workList[uid].cookie = await getCookie(page)
|
||||
workList[uid].status = "pass"
|
||||
break
|
||||
|
||||
elif await isWrongAccountOrPassword(page):
|
||||
print("进入账号密码不正确分支")
|
||||
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "账号或密码不正确"
|
||||
break
|
||||
elif await page.xpath('//*[@id="captcha_modal"]'):
|
||||
print("进入安全验证分支")
|
||||
if await page.xpath('//*[@id="small_img"]'):
|
||||
print("进入过滑块分支")
|
||||
|
||||
workList[uid].status = "pending"
|
||||
workList[uid].msg = "正在过滑块检测"
|
||||
await verification(page)
|
||||
await page.waitFor(2000)
|
||||
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
|
||||
print("进入点形状、颜色验证分支")
|
||||
|
||||
workList[uid].status = "pending"
|
||||
workList[uid].msg = "正在过形状、颜色检测"
|
||||
await verification_shape(page)
|
||||
await page.waitFor(2000)
|
||||
continue
|
||||
if False == sms_sent:
|
||||
print("进入直接发短信分支")
|
||||
if not workList[uid].isAuto:
|
||||
workList[uid].status = "SMS"
|
||||
workList[uid].msg = "需要短信验证"
|
||||
await typePhoneSMScode(page, workList, uid)
|
||||
sms_sent = True
|
||||
|
||||
else:
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "自动续期时不能使用短信验证"
|
||||
print("自动续期时不能使用短信验证")
|
||||
break
|
||||
else:
|
||||
if await isStillInSMSCodeSentPage(page):
|
||||
print("进入验证码错误分支")
|
||||
IN_SMS_TIMES += 1
|
||||
if IN_SMS_TIMES % 3 == 0:
|
||||
workList[uid].SMS_CODE = None
|
||||
workList[uid].status = "wrongSMS"
|
||||
workList[uid].msg = "短信验证码错误,请重新输入"
|
||||
await typePhoneSMScode(page, workList, uid)
|
||||
|
||||
elif await needResendSMSCode(page):
|
||||
print("进入验证码超时分支")
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "验证码超时,请重新开始"
|
||||
break
|
||||
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
continue
|
||||
print("异常退出")
|
||||
print(e)
|
||||
await browser.close()
|
||||
raise e
|
||||
|
||||
print("任务完成退出")
|
||||
|
||||
await browser.close()
|
||||
await deleteSession(workList, uid)
|
||||
return
|
||||
async def loginPassword(chromium_path, workList, uid, headless):
|
||||
# 判断账号密码错误
|
||||
async def isWrongAccountOrPassword(page, verify=False):
|
||||
try:
|
||||
element = await page.xpath('//*[@id="app"]/div/div[5]')
|
||||
if element:
|
||||
text = await page.evaluate(
|
||||
"(element) => element.textContent", element[0]
|
||||
)
|
||||
if text == "账号或密码不正确":
|
||||
if verify == True:
|
||||
return True
|
||||
await asyncio.sleep(2)
|
||||
return await isWrongAccountOrPassword(page, verify=True)
|
||||
return False
|
||||
except Exception as e:
|
||||
print("isWrongAccountOrPassword " + str(e))
|
||||
return False
|
||||
|
||||
# 判断验证码超时
|
||||
async def isStillInSMSCodeSentPage(page):
|
||||
try:
|
||||
if await page.xpath('//*[@id="header"]/span[2]'):
|
||||
element = await page.xpath('//*[@id="header"]/span[2]')
|
||||
if element:
|
||||
text = await page.evaluate(
|
||||
"(element) => element.textContent", element[0]
|
||||
)
|
||||
if text == "手机短信验证":
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print("isStillInSMSCodeSentPage " + str(e))
|
||||
return False
|
||||
|
||||
# 判断验证码错误
|
||||
async def needResendSMSCode(page):
|
||||
try:
|
||||
if await page.xpath('//*[@id="app"]/div/div[2]/div[2]/button'):
|
||||
element = await page.xpath('//*[@id="app"]/div/div[2]/div[2]/button')
|
||||
if element:
|
||||
text = await page.evaluate(
|
||||
"(element) => element.textContent", element[0]
|
||||
)
|
||||
if text == "获取验证码":
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print("needResendSMSCode " + str(e))
|
||||
return False
|
||||
|
||||
async def isSendSMSDirectly(page):
|
||||
try:
|
||||
title = await page.title()
|
||||
if title in ['手机语音验证', '手机短信验证']:
|
||||
print('需要' + title)
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print("isSendSMSDirectly " + str(e))
|
||||
return False
|
||||
|
||||
usernum = workList[uid].account
|
||||
passwd = workList[uid].password
|
||||
sms_sent = False
|
||||
print(f"正在登录 {usernum} 的账号")
|
||||
|
||||
browser = await launch(
|
||||
{
|
||||
"executablePath": chromium_path,
|
||||
"headless": headless,
|
||||
"args": (
|
||||
"--no-sandbox",
|
||||
"--disable-setuid-sandbox",
|
||||
"--disable-dev-shm-usage",
|
||||
"--disable-gpu",
|
||||
"--disable-software-rasterizer",
|
||||
),
|
||||
}
|
||||
)
|
||||
page = await browser.newPage()
|
||||
await page.setViewport({"width": 360, "height": 640})
|
||||
await page.goto(
|
||||
"https://plogin.m.jd.com/login/login?appid=300&returnurl=https%3A%2F%2Fm.jd.com%2F&source=wq_passport"
|
||||
)
|
||||
await typeuser(page, usernum, passwd)
|
||||
|
||||
IN_SMS_TIMES = 0
|
||||
start_time = datetime.datetime.now()
|
||||
|
||||
while True:
|
||||
try:
|
||||
now_time = datetime.datetime.now()
|
||||
print("循环检测中...")
|
||||
if (now_time - start_time).total_seconds() > 90:
|
||||
print("进入超时分支")
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "登录超时"
|
||||
break
|
||||
|
||||
elif await page.J("#searchWrapper"):
|
||||
print("进入成功获取cookie分支")
|
||||
workList[uid].cookie = await getCookie(page)
|
||||
workList[uid].status = "pass"
|
||||
break
|
||||
|
||||
elif await isWrongAccountOrPassword(page):
|
||||
print("进入账号密码不正确分支")
|
||||
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "账号或密码不正确"
|
||||
break
|
||||
elif await page.xpath('//*[@id="captcha_modal"]'):
|
||||
print("进入安全验证分支")
|
||||
if await page.xpath('//*[@id="small_img"]'):
|
||||
print("进入过滑块分支")
|
||||
|
||||
workList[uid].status = "pending"
|
||||
workList[uid].msg = "正在过滑块检测"
|
||||
await verification(page)
|
||||
await page.waitFor(3000)
|
||||
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
|
||||
print("进入点形状、颜色验证分支")
|
||||
|
||||
workList[uid].status = "pending"
|
||||
workList[uid].msg = "正在过形状、颜色检测"
|
||||
await verification_shape(page)
|
||||
await page.waitFor(3000)
|
||||
continue
|
||||
if not sms_sent:
|
||||
|
||||
if await page.J(".sub-title"):
|
||||
print("进入选择短信验证分支")
|
||||
if not workList[uid].isAuto:
|
||||
workList[uid].status = "SMS"
|
||||
workList[uid].msg = "需要短信验证"
|
||||
|
||||
await sendSMS(page)
|
||||
await page.waitFor(3000)
|
||||
await typeSMScode(page, workList, uid)
|
||||
sms_sent = True
|
||||
|
||||
else:
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "自动续期时不能使用短信验证"
|
||||
print("自动续期时不能使用短信验证")
|
||||
break
|
||||
elif await isSendSMSDirectly(page):
|
||||
print("进入直接发短信分支")
|
||||
|
||||
if not workList[uid].isAuto:
|
||||
workList[uid].status = "SMS"
|
||||
workList[uid].msg = "需要短信验证"
|
||||
await sendSMSDirectly(page)
|
||||
await page.waitFor(3000)
|
||||
await typeSMScode(page, workList, uid)
|
||||
sms_sent = True
|
||||
|
||||
else:
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "自动续期时不能使用短信验证"
|
||||
print("自动续期时不能使用短信验证")
|
||||
break
|
||||
else:
|
||||
if await isStillInSMSCodeSentPage(page):
|
||||
print("进入验证码错误分支")
|
||||
IN_SMS_TIMES += 1
|
||||
if IN_SMS_TIMES % 3 == 0:
|
||||
workList[uid].SMS_CODE = None
|
||||
workList[uid].status = "wrongSMS"
|
||||
workList[uid].msg = "短信验证码错误,请重新输入"
|
||||
await typeSMScode(page, workList, uid)
|
||||
|
||||
elif await needResendSMSCode(page):
|
||||
print("进入验证码超时分支")
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "验证码超时,请重新开始"
|
||||
break
|
||||
|
||||
await asyncio.sleep(1)
|
||||
except Exception as e:
|
||||
continue
|
||||
print("异常退出")
|
||||
print(e)
|
||||
await browser.close()
|
||||
raise e
|
||||
|
||||
print("任务完成退出")
|
||||
|
||||
await browser.close()
|
||||
await deleteSession(workList, uid)
|
||||
return
|
||||
|
||||
async def typephoneuser(page, usernum):
|
||||
tel_input = await page.waitForSelector('input[type="tel"]')
|
||||
await tel_input.click()
|
||||
await tel_input.type(usernum)
|
||||
#await page.type(
|
||||
# "input[type='tel']", usernum, {"delay": random.randint(50,100)}
|
||||
#)
|
||||
await page.waitFor(random.randint(200, 500))
|
||||
await page.click(".policy_tip-checkbox")
|
||||
await page.waitFor(random.randint(200, 500))
|
||||
await page.click(".getMsg-btn.text-btn.timer")
|
||||
await page.waitFor(random.randint(500, 1000))
|
||||
async def typeuser(page, usernum, passwd):
|
||||
print("开始输入账号密码")
|
||||
await page.waitForSelector(".J_ping.planBLogin")
|
||||
await page.click(".J_ping.planBLogin")
|
||||
await page.type(
|
||||
"#username", usernum, {"delay": random.randint(10, 20)}
|
||||
)
|
||||
await page.type(
|
||||
"#pwd", passwd, {"delay": random.randint(10, 20)}
|
||||
)
|
||||
await page.waitFor(random.randint(200, 500))
|
||||
await page.click(".policy_tip-checkbox")
|
||||
await page.waitFor(random.randint(200, 500))
|
||||
await page.click(".btn.J_ping.btn-active")
|
||||
await page.waitFor(random.randint(500, 1000))
|
||||
|
||||
|
||||
async def sendSMSDirectly(page):
|
||||
async def preSendSMS(page):
|
||||
await page.waitForXPath(
|
||||
'//*[@id="app"]/div/div[2]/div[2]/button'
|
||||
)
|
||||
await page.waitFor(random.randint(1, 3) * 1000)
|
||||
elements = await page.xpath(
|
||||
'//*[@id="app"]/div/div[2]/div[2]/button'
|
||||
)
|
||||
await elements[0].click()
|
||||
await page.waitFor(3000)
|
||||
|
||||
await preSendSMS(page)
|
||||
print("开始发送验证码")
|
||||
|
||||
try:
|
||||
while True:
|
||||
if await page.xpath('//*[@id="captcha_modal"]/div/div[3]/div'):
|
||||
await verification(page)
|
||||
|
||||
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
|
||||
await verification_shape(page)
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
await page.waitFor(3000)
|
||||
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
|
||||
async def sendSMS(page):
|
||||
async def preSendSMS(page):
|
||||
print("进行发送验证码前置操作")
|
||||
await page.waitForXPath(
|
||||
'//*[@id="app"]/div/div[2]/div[2]/span/a'
|
||||
)
|
||||
await page.waitFor(random.randint(1, 3) * 1000)
|
||||
elements = await page.xpath(
|
||||
'//*[@id="app"]/div/div[2]/div[2]/span/a'
|
||||
)
|
||||
await elements[0].click()
|
||||
await page.waitForXPath(
|
||||
'//*[@id="app"]/div/div[2]/div[2]/button'
|
||||
)
|
||||
await page.waitFor(random.randint(1, 3) * 1000)
|
||||
elements = await page.xpath(
|
||||
'//*[@id="app"]/div/div[2]/div[2]/button'
|
||||
)
|
||||
await elements[0].click()
|
||||
await page.waitFor(3000)
|
||||
|
||||
await preSendSMS(page)
|
||||
print("开始发送验证码")
|
||||
|
||||
try:
|
||||
while True:
|
||||
if await page.xpath('//*[@id="captcha_modal"]/div/div[3]/div'):
|
||||
await verification(page)
|
||||
|
||||
elif await page.xpath('//*[@id="captcha_modal"]/div/div[3]/button'):
|
||||
await verification_shape(page)
|
||||
|
||||
else:
|
||||
break
|
||||
|
||||
await page.waitFor(3000)
|
||||
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
async def typePhoneSMScode(page, workList, uid):
|
||||
print("开始输入验证码")
|
||||
|
||||
async def get_verification_code(workList, uid):
|
||||
print("开始从全局变量获取验证码")
|
||||
retry = 60
|
||||
while not workList[uid].SMS_CODE and not retry < 0:
|
||||
await asyncio.sleep(1)
|
||||
retry -= 1
|
||||
if retry < 0:
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "输入短信验证码超时"
|
||||
return
|
||||
|
||||
workList[uid].status = "pending"
|
||||
return workList[uid].SMS_CODE
|
||||
code = await get_verification_code(workList, uid)
|
||||
if not code:
|
||||
return
|
||||
|
||||
workList[uid].status = "pending"
|
||||
workList[uid].msg = "正在通过短信验证"
|
||||
authcode_input = await page.waitForSelector('#authcode')
|
||||
await authcode_input.type(code)
|
||||
await page.waitFor(random.randint(100,300))
|
||||
button = await page.waitForSelector('.btn.J_ping')
|
||||
await button.click()
|
||||
await page.waitFor(random.randint(2, 3) * 1000)
|
||||
|
||||
|
||||
|
||||
async def typeSMScode(page, workList, uid):
|
||||
print("开始输入验证码")
|
||||
|
||||
async def get_verification_code(workList, uid):
|
||||
print("开始从全局变量获取验证码")
|
||||
retry = 60
|
||||
while not workList[uid].SMS_CODE and not retry < 0:
|
||||
await asyncio.sleep(1)
|
||||
retry -= 1
|
||||
if retry < 0:
|
||||
workList[uid].status = "error"
|
||||
workList[uid].msg = "输入短信验证码超时"
|
||||
return
|
||||
|
||||
workList[uid].status = "pending"
|
||||
return workList[uid].SMS_CODE
|
||||
await page.waitForXPath('//*[@id="app"]/div/div[2]/div[2]/div/input')
|
||||
code = await get_verification_code(workList, uid)
|
||||
if not code:
|
||||
return
|
||||
|
||||
workList[uid].status = "pending"
|
||||
workList[uid].msg = "正在通过短信验证"
|
||||
input_elements = await page.xpath('//*[@id="app"]/div/div[2]/div[2]/div/input')
|
||||
|
||||
try:
|
||||
if input_elements:
|
||||
input_value = await input_elements[0].getProperty("value")
|
||||
if input_value:
|
||||
print("清除验证码输入框中已有的验证码")
|
||||
await page.evaluate(
|
||||
'(element) => element.value = ""', input_elements[0]
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print("typeSMScode" + str(e))
|
||||
|
||||
await input_elements[0].type(code)
|
||||
await page.waitForXPath('//*[@id="app"]/div/div[2]/a[1]')
|
||||
await page.waitFor(random.randint(1, 3) * 1000)
|
||||
elements = await page.xpath('//*[@id="app"]/div/div[2]/a[1]')
|
||||
await elements[0].click()
|
||||
await page.waitFor(random.randint(2, 3) * 1000)
|
||||
|
||||
|
||||
async def verification(page):
|
||||
print("开始过滑块")
|
||||
|
||||
async def get_distance():
|
||||
img = cv2.imread("image.png", 0)
|
||||
template = cv2.imread("template.png", 0)
|
||||
img = cv2.GaussianBlur(img, (5, 5), 0)
|
||||
template = cv2.GaussianBlur(template, (5, 5), 0)
|
||||
bg_edge = cv2.Canny(img, 100, 200)
|
||||
cut_edge = cv2.Canny(template, 100, 200)
|
||||
img = cv2.cvtColor(bg_edge, cv2.COLOR_GRAY2RGB)
|
||||
template = cv2.cvtColor(cut_edge, cv2.COLOR_GRAY2RGB)
|
||||
res = cv2.matchTemplate(
|
||||
img, template, cv2.TM_CCOEFF_NORMED
|
||||
)
|
||||
value = cv2.minMaxLoc(res)[3][0]
|
||||
distance = (
|
||||
value + 10
|
||||
)
|
||||
return distance
|
||||
|
||||
await page.waitForSelector("#cpc_img")
|
||||
image_src = await page.Jeval(
|
||||
"#cpc_img", 'el => el.getAttribute("src")'
|
||||
)
|
||||
request.urlretrieve(image_src, "image.png")
|
||||
width = await page.evaluate(
|
||||
'() => { return document.getElementById("cpc_img").clientWidth; }'
|
||||
)
|
||||
height = await page.evaluate(
|
||||
'() => { return document.getElementById("cpc_img").clientHeight; }'
|
||||
)
|
||||
image = Image.open("image.png")
|
||||
resized_image = image.resize((width, height))
|
||||
resized_image.save("image.png")
|
||||
template_src = await page.Jeval(
|
||||
"#small_img", 'el => el.getAttribute("src")'
|
||||
)
|
||||
request.urlretrieve(template_src, "template.png")
|
||||
width = await page.evaluate(
|
||||
'() => { return document.getElementById("small_img").clientWidth; }'
|
||||
)
|
||||
height = await page.evaluate(
|
||||
'() => { return document.getElementById("small_img").clientHeight; }'
|
||||
)
|
||||
image = Image.open("template.png")
|
||||
resized_image = image.resize((width, height))
|
||||
resized_image.save("template.png")
|
||||
await page.waitFor(100)
|
||||
el = await page.querySelector(
|
||||
"#captcha_modal > div > div.captcha_footer > div > img"
|
||||
)
|
||||
box = await el.boundingBox()
|
||||
distance = await get_distance()
|
||||
await page.mouse.move(box["x"] + 10, box["y"] + 10)
|
||||
await page.mouse.down()
|
||||
await page.mouse.move(
|
||||
box["x"] + distance + random.uniform(3, 15), box["y"], {"steps": 10}
|
||||
)
|
||||
await page.waitFor(
|
||||
random.randint(100, 400)
|
||||
)
|
||||
await page.mouse.move(
|
||||
box["x"] + distance, box["y"], {"steps": 10}
|
||||
)
|
||||
await page.mouse.up()
|
||||
print("过滑块结束")
|
||||
|
||||
|
||||
async def verification_shape(page):
|
||||
print("开始过颜色、形状验证")
|
||||
|
||||
def get_shape_location_by_type(img_path, type: str):
|
||||
def sort_rectangle_vertices(vertices):
|
||||
vertices = sorted(vertices, key=lambda x: x[1])
|
||||
top_left, top_right = sorted(vertices[:2], key=lambda x: x[0])
|
||||
bottom_left, bottom_right = sorted(vertices[2:], key=lambda x: x[0])
|
||||
return [top_left, top_right, bottom_right, bottom_left]
|
||||
|
||||
def is_trapezoid(vertices):
|
||||
top_width = abs(vertices[1][0] - vertices[0][0])
|
||||
bottom_width = abs(vertices[2][0] - vertices[3][0])
|
||||
return top_width < bottom_width
|
||||
|
||||
img = cv2.imread(img_path)
|
||||
imgGray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
|
||||
imgBlur = cv2.GaussianBlur(imgGray, (5, 5), 1)
|
||||
imgCanny = cv2.Canny(imgBlur, 60, 60)
|
||||
contours, hierarchy = cv2.findContours(
|
||||
imgCanny, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE
|
||||
)
|
||||
for obj in contours:
|
||||
perimeter = cv2.arcLength(obj, True)
|
||||
approx = cv2.approxPolyDP(obj, 0.02 * perimeter, True)
|
||||
CornerNum = len(approx)
|
||||
x, y, w, h = cv2.boundingRect(approx)
|
||||
|
||||
if CornerNum == 3:
|
||||
obj_type = "三角形"
|
||||
elif CornerNum == 4:
|
||||
if w == h:
|
||||
obj_type = "正方形"
|
||||
else:
|
||||
approx = sort_rectangle_vertices([vertex[0] for vertex in approx])
|
||||
if is_trapezoid(approx):
|
||||
obj_type = "梯形"
|
||||
else:
|
||||
obj_type = "长方形"
|
||||
elif CornerNum == 6:
|
||||
obj_type = "六边形"
|
||||
elif CornerNum == 8:
|
||||
obj_type = "圆形"
|
||||
elif CornerNum == 20:
|
||||
obj_type = "五角星"
|
||||
else:
|
||||
obj_type = "未知"
|
||||
|
||||
if obj_type == type:
|
||||
center_x, center_y = x + w // 2, y + h // 2
|
||||
return center_x, center_y
|
||||
|
||||
return None, None
|
||||
|
||||
def get_shape_location_by_color(img_path, target_color):
|
||||
image = cv2.imread(img_path)
|
||||
hsv_image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
|
||||
|
||||
lower, upper = supported_colors[target_color]
|
||||
lower = np.array(lower, dtype="uint8")
|
||||
upper = np.array(upper, dtype="uint8")
|
||||
|
||||
mask = cv2.inRange(hsv_image, lower, upper)
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
for contour in contours:
|
||||
if cv2.contourArea(contour) > 100:
|
||||
M = cv2.moments(contour)
|
||||
if M["m00"] != 0:
|
||||
cX = int(M["m10"] / M["m00"])
|
||||
cY = int(M["m01"] / M["m00"])
|
||||
return cX, cY
|
||||
|
||||
return None, None
|
||||
|
||||
def get_word(ocr, img_path):
|
||||
image_bytes = open(img_path, "rb").read()
|
||||
result = ocr.classification(image_bytes, png_fix=True)
|
||||
return result
|
||||
|
||||
def rgba2rgb(rgb_image_path, rgba_img_path):
|
||||
rgba_image = Image.open(rgba_img_path)
|
||||
rgb_image = Image.new("RGB", rgba_image.size, (255, 255, 255))
|
||||
rgb_image.paste(rgba_image, (0, 0), rgba_image)
|
||||
rgb_image.save(rgb_image_path)
|
||||
|
||||
def save_img(img_path, img_bytes):
|
||||
with Image.open(io.BytesIO(img_bytes)) as img:
|
||||
img.save(img_path)
|
||||
|
||||
def get_img_bytes(img_src: str) -> bytes:
|
||||
img_base64 = re.search(r"base64,(.*)", img_src)
|
||||
if img_base64:
|
||||
base64_code = img_base64.group(1)
|
||||
img_bytes = base64.b64decode(base64_code)
|
||||
return img_bytes
|
||||
else:
|
||||
raise "image is empty"
|
||||
|
||||
for i in range(5):
|
||||
await page.waitForSelector("div.captcha_footer img")
|
||||
image_src = await page.Jeval(
|
||||
"#cpc_img", 'el => el.getAttribute("src")'
|
||||
)
|
||||
request.urlretrieve(image_src, "shape_image.png")
|
||||
width = await page.evaluate(
|
||||
'() => { return document.getElementById("cpc_img").clientWidth; }'
|
||||
)
|
||||
height = await page.evaluate(
|
||||
'() => { return document.getElementById("cpc_img").clientHeight; }'
|
||||
)
|
||||
image = Image.open("shape_image.png")
|
||||
resized_image = image.resize((width, height))
|
||||
resized_image.save("shape_image.png")
|
||||
|
||||
b_image = await page.querySelector("#cpc_img")
|
||||
b_image_box = await b_image.boundingBox()
|
||||
image_top_left_x = b_image_box["x"]
|
||||
image_top_left_y = b_image_box["y"]
|
||||
|
||||
word_src = await page.Jeval(
|
||||
"div.captcha_footer img", 'el => el.getAttribute("src")'
|
||||
)
|
||||
word_bytes = get_img_bytes(word_src)
|
||||
save_img("rgba_word_img.png", word_bytes)
|
||||
rgba2rgb("rgb_word_img.png", "rgba_word_img.png")
|
||||
word = get_word(ocr, "rgb_word_img.png")
|
||||
|
||||
button = await page.querySelector("div.captcha_footer button.sure_btn")
|
||||
refresh_button = await page.querySelector("div.captcha_header img.jcap_refresh")
|
||||
|
||||
if word.find("色") > 0:
|
||||
target_color = word.split("请选出图中")[1].split("的图形")[0]
|
||||
if target_color in supported_colors:
|
||||
print(f"正在找{target_color}")
|
||||
center_x, center_y = get_shape_location_by_color(
|
||||
"shape_image.png", target_color
|
||||
)
|
||||
if center_x is None and center_y is None:
|
||||
print("识别失败,刷新")
|
||||
await refresh_button.click()
|
||||
await asyncio.sleep(random.uniform(2, 4))
|
||||
continue
|
||||
x, y = image_top_left_x + center_x, image_top_left_y + center_y
|
||||
await page.mouse.click(x, y)
|
||||
await asyncio.sleep(random.uniform(0.5, 2))
|
||||
await button.click()
|
||||
await asyncio.sleep(random.uniform(0.3, 1))
|
||||
break
|
||||
else:
|
||||
print(f"不支持{target_color},重试")
|
||||
await refresh_button.click()
|
||||
await asyncio.sleep(random.uniform(2, 4))
|
||||
continue
|
||||
|
||||
else:
|
||||
shape_type = word.split("请选出图中的")[1]
|
||||
if shape_type in supported_types:
|
||||
print(f"正在找{shape_type}")
|
||||
if shape_type == "圆环":
|
||||
shape_type = shape_type.replace("圆环", "圆形")
|
||||
center_x, center_y = get_shape_location_by_type(
|
||||
"shape_image.png", shape_type
|
||||
)
|
||||
if center_x is None and center_y is None:
|
||||
print(f"识别失败,刷新")
|
||||
await refresh_button.click()
|
||||
await asyncio.sleep(random.uniform(2, 4))
|
||||
continue
|
||||
x, y = image_top_left_x + center_x, image_top_left_y + center_y
|
||||
await page.mouse.click(x, y)
|
||||
await asyncio.sleep(random.uniform(0.5, 2))
|
||||
await button.click()
|
||||
await asyncio.sleep(random.uniform(0.3, 1))
|
||||
break
|
||||
else:
|
||||
print(f"不支持{shape_type},刷新中......")
|
||||
await refresh_button.click()
|
||||
await asyncio.sleep(random.uniform(2, 4))
|
||||
continue
|
||||
print("过图形结束")
|
||||
|
||||
|
||||
async def getCookie(page):
|
||||
cookies = await page.cookies()
|
||||
pt_key = ""
|
||||
pt_pin = ""
|
||||
for cookie in cookies:
|
||||
if cookie["name"] == "pt_key":
|
||||
pt_key = cookie["value"]
|
||||
elif cookie["name"] == "pt_pin":
|
||||
pt_pin = cookie["value"]
|
||||
ck = f"pt_key={pt_key};pt_pin={pt_pin};"
|
||||
print(f"登录成功 {ck}")
|
||||
return ck
|
||||
|
||||
|
||||
async def download_file(url, file_path):
|
||||
timeout = aiohttp.ClientTimeout(total=60000)
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
async with session.get(url) as response:
|
||||
with open(file_path, "wb") as file:
|
||||
file_size = int(response.headers.get("Content-Length", 0))
|
||||
downloaded_size = 0
|
||||
chunk_size = 1024
|
||||
while True:
|
||||
chunk = await response.content.read(chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
file.write(chunk)
|
||||
downloaded_size += len(chunk)
|
||||
progress = (downloaded_size / file_size) * 100
|
||||
print(f"已下载{progress:.2f}%...", end="\r")
|
||||
print("下载完成,进行解压安装....")
|
||||
|
||||
|
||||
async def main(workList, uid, oocr):
|
||||
global ocr
|
||||
ocr = oocr
|
||||
|
||||
async def init_chrome():
|
||||
if platform.system() == "Windows":
|
||||
chrome_dir = os.path.join(
|
||||
os.environ["USERPROFILE"],
|
||||
"AppData",
|
||||
"Local",
|
||||
"pyppeteer",
|
||||
"pyppeteer",
|
||||
"local-chromium",
|
||||
"588429",
|
||||
"chrome-win32",
|
||||
)
|
||||
chrome_exe = os.path.join(chrome_dir, "chrome.exe")
|
||||
chmod_dir = os.path.join(
|
||||
os.environ["USERPROFILE"],
|
||||
"AppData",
|
||||
"Local",
|
||||
"pyppeteer",
|
||||
"pyppeteer",
|
||||
"local-chromium",
|
||||
"588429",
|
||||
"chrome-win32",
|
||||
"chrome-win32",
|
||||
)
|
||||
if os.path.exists(chrome_exe):
|
||||
return chrome_exe
|
||||
else:
|
||||
print("貌似第一次使用,未找到chrome,正在下载chrome浏览器....")
|
||||
|
||||
chromeurl = "https://mirrors.huaweicloud.com/chromium-browser-snapshots/Win_x64/588429/chrome-win32.zip"
|
||||
target_file = "chrome-win.zip"
|
||||
await download_file(chromeurl, target_file)
|
||||
with zipfile.ZipFile(target_file, "r") as zip_ref:
|
||||
zip_ref.extractall(chrome_dir)
|
||||
os.remove(target_file)
|
||||
for item in os.listdir(chmod_dir):
|
||||
source_item = os.path.join(chmod_dir, item)
|
||||
destination_item = os.path.join(chrome_dir, item)
|
||||
os.rename(source_item, destination_item)
|
||||
print("解压安装完成")
|
||||
await asyncio.sleep(1)
|
||||
return chrome_exe
|
||||
|
||||
elif platform.system() == "Linux":
|
||||
chrome_path = os.path.expanduser(
|
||||
"~/.local/share/pyppeteer/local-chromium/1181205/chrome-linux/chrome"
|
||||
)
|
||||
download_path = os.path.expanduser(
|
||||
"~/.local/share/pyppeteer/local-chromium/1181205/"
|
||||
)
|
||||
if os.path.isfile(chrome_path):
|
||||
return chrome_path
|
||||
else:
|
||||
print("貌似第一次使用,未找到chrome,正在下载chrome浏览器....")
|
||||
print("文件位于github,请耐心等待,如遇到网络问题可到项目地址手动下载")
|
||||
download_url = "https://mirrors.huaweicloud.com/chromium-browser-snapshots/Linux_x64/884014/chrome-linux.zip"
|
||||
if 'arm' in platform.machine():
|
||||
download_url = "https://playwright.azureedge.net/builds/chromium/1088/chromium-linux-arm64.zip";
|
||||
if not os.path.exists(download_path):
|
||||
os.makedirs(download_path, exist_ok=True)
|
||||
target_file = os.path.join(
|
||||
download_path, "chrome-linux.zip"
|
||||
)
|
||||
await download_file(download_url, target_file)
|
||||
with zipfile.ZipFile(target_file, "r") as zip_ref:
|
||||
zip_ref.extractall(download_path)
|
||||
#os.remove(target_file)
|
||||
os.chmod(chrome_path, 0o755)
|
||||
return chrome_path
|
||||
elif platform.system() == "Darwin":
|
||||
return "mac"
|
||||
else:
|
||||
return "unknown"
|
||||
print("判断初始化浏览器")
|
||||
chromium_path = await init_chrome()
|
||||
headless = True
|
||||
print("选择登录")
|
||||
if workList[uid].type == "phone":
|
||||
print("选择手机号登录")
|
||||
await loginPhone(chromium_path, workList, uid, headless)
|
||||
elif workList[uid].type == "password":
|
||||
print("选择密码登录")
|
||||
await loginPassword(chromium_path, workList, uid, headless)
|
||||
os.remove("image.png") if os.path.exists("image.png") else None
|
||||
os.remove("template.png") if os.path.exists("template.png") else None
|
||||
os.remove("shape_image.png") if os.path.exists("shape_image.png") else None
|
||||
os.remove("rgba_word_img.png") if os.path.exists("rgba_word_img.png") else None
|
||||
os.remove("rgb_word_img.png") if os.path.exists("rgb_word_img.png") else None
|
||||
print("登录完成")
|
||||
await asyncio.sleep(10)
|
||||
Loading…
Reference in New Issue
Block a user