import random import requests import time import re import threading import os # 分别为检测文章id和不提现列表 check_id = [1,2,3] no_cash_list = [] # ---->注意:不会处理异常情况,请自行捕获<----- class Compatible_auto: def __init__(self,auto_type,api_address): self.api_address = api_address self.auto_type = auto_type def get_web_code(self,wx_id,appid_or_url): # 鸡哥过检传入wx_id和url,返回url,不直接返回code if self.auto_type == "jige": data = {'Wxid': wx_id, 'url': appid_or_url} url = requests.post(self.api_address + '/loginbyweb', json=data).json()['url'] return url # 汐念过检传入wx_id和appid,返回code if self.auto_type == "xn": data = { "Appid": appid_or_url, "Url": "", "Wxid": wx_id } r = requests.post(f"{self.api_address}/api/Tools/ThirdAppGrant", json=data).json() return r['Data'] else: print("该功能仅支持汐念(xn)和鸡哥(jige)的过检软件") raise ValueError("仅功能支持汐念(xn)和鸡哥(jige)的过检软件") def get_mini_code(self,wx_id,appid,api_address): # 直接返回code if self.auto_type == "jige": data = {'Wxid': wx_id, 'appid':appid} code = requests.get(api_address + '/loginbyapp', params=data).json()["code"] return code else: print("该功能仅鸡哥(jige)的过检软件支持") raise ValueError("该功能仅鸡哥(jige)的过检软件支持") def send_passage(self,url): if self.auto_type == "jige": api_url = f"{self.api_address}/zdgjc" data = {"url": url} r = requests.post(api_url, json=data).json() print(f"推送结果[{r['msg']}]") else: print("该功能仅鸡哥(jige)的过检软件支持") raise ValueError("该功能仅鸡哥(jige)的过检软件支持") def generate_ua(): devices_uas =['PGP110 Build/UKQ1.230924.001', 'HONOR Build/HK1.220923.005', 'VIVO Build/UKQ1.230912.001', 'TYYH Build/UA1.220923.005'] internet_states = ['5G','4G','WIFI'] ua=(f'Mozilla/5.0 (Linux; Android {random.randint(8,14)}; {random.choice(devices_uas)}/UKQ1.230924.001; wv) ' f'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/116.0.0.0 Mobile Safari/537.36 XWEB/1160117 ' f'MMWEBSDK/20240404 MMWEBID/7076 MicroMessenger/8.0.49.2600(0x2800313B) WeChat/arm64 Weixin NetType/{random.choice(internet_states)} ' f'Language/zh_CN ABI/arm64') return ua class Yuer: def __init__(self,wx_id,api_address,wx_name,invite_url=None): self.next_time = None self.read_pages = None self.money = None self.withdraw_headers = None self.union_url = None if not invite_url: self.invite_url = 'http://h5.yyyeee670771.1gkjk8yhyw.cn/yeipad?ewk=ako&tay=tvo&tdq=uin&tfs=qip&ugs=avn&upuid=3387330' else: self.invite_url = invite_url self.read_headers = None self.wx_name = wx_name self.api_address = api_address self.wx_id = wx_id self.ua = generate_ua() self.cookie = None def login(self): try: login_headers = { "Upgrade-Insecure-Requests": "1", "User-Agent": self.ua, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9," "image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9" } r = requests.get(self.invite_url, headers=login_headers, allow_redirects=False).headers location_url = self.invite_url[:self.invite_url.rfind(r"/")] + r["Location"] self.cookie = r['Set-Cookie'].split(";")[0] login_headers["Cookie"] = self.cookie r = requests.get(location_url, headers=login_headers, allow_redirects=False).headers["Location"] url = Compatible_auto("jige",self.api_address).get_web_code(self.wx_id,r) requests.get(url, headers=login_headers, allow_redirects=False) requests.get("http://yyyeee670894.1gkjk8yhyw.cn/user/login?"+url.split("?")[1], headers=login_headers, allow_redirects=False) # print(login_headers['Cookie']) # self.union_url = url[:url.rfind(r"/")] self.read_headers = {'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'User-Agent': self.ua, 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,zh-TW;q=0.5', 'Cookie': self.cookie} self.withdraw_headers = {'Proxy-Connection': 'keep-alive', 'Accept': '*/*', 'X-Requested-With': 'XMLHttpRequest', 'User-Agent': self.ua, 'Content-Type': 'application/x-www-form-urlencoded', 'Origin': self.union_url, 'Referer': f'{self.union_url}/withdrawal', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Cookie': self.cookie} url = requests.get('http://h5.abokxeh.cn/pipa_read?upuid=2802253', headers=self.read_headers, allow_redirects=False).headers['Location'] self.union_url = url[:url.rfind(r"/")] print("【{}】登录成功".format(self.wx_name)) return True except: print("【{}】登录时遇到问题".format(self.wx_name)) return False def user_info(self): try: url = f'{self.union_url}/yeipad?bqb=eom&ivb=kff&qsz=nne&upuid=3284491&vlm=ssr&xik=mzo' user_detail = requests.get(url, headers=self.read_headers) if "抱歉,出错了" in user_detail.text: print("【{}】账号被拉黑".format(self.wx_name)) return False money = re.search(r"余额:([\d.]+)元", user_detail.text) read_pages = re.search(r'([\d.]+)篇', user_detail.text).group(0).replace('篇', '') next_time = re.search(r'([\d.]+)分钟后到来

', user_detail.text) if next_time != None: next_time = next_time.group(0).replace('分钟后到来

', '') else: next_time = 0 self.money = float(money.group(1)) self.read_pages = int(read_pages) self.next_time = int(next_time) print("【{}】已有{}元,已读{}篇,下次阅读时间{}".format(self.wx_name,self.money,self.read_pages,self.next_time)) return True except: print("【{}】账号被拉黑".format(self.wx_name)) def read(self): headers = { "Connection": "keep-alive", "Accept": "*/*", "Access-Control-Request-Method": "GET", "Access-Control-Request-Headers": "x-requested-with", "Origin": self.union_url, "User-Agent": self.ua, "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "cross-site", "Sec-Fetch-Dest": "empty", "Referer": f"{self.union_url}/", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9" } url = requests.get(f'http://yyyeee670930.1gkjk8yhyw.cn/read_task/ggg3', headers=self.read_headers).json()['jump'].replace('read.html?','read_task/ddr?')+'&type=7&pageshow'+'&r=' url = 'https://h5.v2dx6gzdx5.cn/read_task' + url[url.rfind('/'):] read_response = requests.get(url + str(random.random()), headers=headers) if '已经被限制' in read_response.text: print('【{}】阅读被限制,继续下一个账号'.format(self.wx_name)) elif 'finish' in read_response.text: print('【{}】到达最后一篇文章'.format(self.wx_name)) return # read_response = read_response.json() # final_url = 'http://' + str(re.findall(r'^(?:http?://)?([^/]+)', url)).replace('[\'', '').replace('\']', # '') + f'/read_task/{read_response["url"]}&_t=799888' # response = requests.get(final_url, headers=headers) # if '您已完成阅读任务' in response.text: # user = str(BeautifulSoup(response.text, 'html.parser').find_all('p')[0]) # print(f"【{user.replace('

', '').replace('

', '')}】本轮阅读已经完成") # else: # print('未知错误') else: key = read_response.json() for i in range(1, 31): print(f'第{i}次') if i + self.read_pages in check_id: print(f"【{self.wx_name}】到达设置的检测文章id{i + self.read_pages},等待过检") Compatible_auto("jige",self.api_address).send_passage(key['url']) time.sleep(10) else: sleep = random.randint(6, 10) print(f"【{self.wx_name}】模拟阅读{sleep}s") time.sleep(sleep) read_response = requests.get(url + str(random.random()) + f'&jkey={key["jkey"]}', headers=headers) key = read_response.json() if '已经被限制' in read_response.text: print(f'【{self.wx_name}】阅读被限制,继续下一个账号,被限制的id为{self.read_pages + i}') check_id.append(self.read_pages + i) break elif 'finish' in read_response.text: print(f'【{self.wx_name}】到达最后一篇文章') break print(f"【{self.wx_name}】阅读成功") def withdraw(self): if self.money >= 0.3 and self.wx_id not in no_cash_list: money = round(self.money * 100, 1) data = {'channel': 'wechat', 'money': str(money)} # 微信 # data = {'channel':'alipay','money':str(money),'u_ali_account':'账号','u_ali_real_name':'名字'}#支付宝,需要url编码 res = requests.post(self.union_url + '/withdrawal/submit_withdraw', headers=self.withdraw_headers, data=data) print("【{}】提现结果:{}".format(self.wx_name,res.text)) else: print("【{}】不提现或不满足提现要求".format(self.wx_name)) def main(wx_id, api_address, wx_name): a = Yuer(wx_id, api_address, wx_name) if a.login(): pass else: return try: if a.user_info(): pass else: return except: return try: a.read() except: pass a.user_info() a.withdraw() if __name__ == "__main__": env_name = "yuanshen_api" semaphore = threading.BoundedSemaphore(value=1) threads = [] api_add = os.getenv(env_name) if api_add is None: print('无api地址,请创建环境变量yuanshen_api填入api_address') exit(0) else: api_add = "http://{}".format(api_add) sessions = requests.get(api_add+ '/getallwx').json() for session in sessions: try: t = threading.Thread(target=main, args=(session["Wxid"], api_add, session["wxname"],)) threads.append(t) t.start() time.sleep(5) except Exception as e: print("【{}】发生异常{}".format(session["wxname"], e)) for t in threads: t.join()