From fce65827d782b51b62ca396342e04fc5ed949fa4 Mon Sep 17 00:00:00 2001 From: Leon Date: Tue, 10 Aug 2021 16:06:26 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=9A=82=F0=9F=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Scripts/py/mimotion.py | 330 +++++++++++++++++++++++++++-------------- 1 file changed, 220 insertions(+), 110 deletions(-) diff --git a/Scripts/py/mimotion.py b/Scripts/py/mimotion.py index 9d6477e..8e61fb4 100644 --- a/Scripts/py/mimotion.py +++ b/Scripts/py/mimotion.py @@ -12,14 +12,13 @@ headers = { 'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 9; MI 6 MIUI/20.6.18)' } - -#获取登录code +# 获取登录 code def get_code(location): code_pattern = re.compile("(?<=access=).*?(?=&)") code = code_pattern.findall(location)[0] return code -#登录 +# 登录 def login(user,password): url1 = "https://api-user.huami.com/registrations/+86" + user + "/tokens" headers = { @@ -62,18 +61,18 @@ def login(user,password): return login_token,userid -#主函数 +# 主函数 def main(user, passwd, step): user = str(user) password = str(passwd) step = str(step) if user == '' or password == '': - print ("用户名或密码填写有误!") - return + print("用户名或密码不能为空!") + return "user and passwd not empty!" if step == '': - print ("已设置为随机步数(20000-29999)") - step = str(random.randint(20000,21999)) + print ("已设置为随机步数(18000-25000)") + step = str(random.randint(18000,25000)) login_token = 0 login_token,userid = login(user,password) if login_token == 0: @@ -103,21 +102,18 @@ def main(user, passwd, step): response = requests.post(url, data=data, headers=head).json() #print(response) - result = f"[{now}] \n{user[:3]}****{user[-3:]}: \n小米运动修改步数({step})"+ response['message'] + result = f"{user[:4]}****{user[-4:]}: [{now}] 修改步数({step})"+ response['message'] print(result) - # fo = open("foo.txt", "a") - # fo.write(result) - # fo.close() return result -#获取时间戳 +# 获取时间戳 def get_time(): url = 'http://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp' response = requests.get(url,headers=headers).json() t = response['data']['t'] return t -#获取app_token +# 获取app_token def get_app_token(login_token): url = f"https://account-cn.huami.com/v1/client/app_tokens?app_name=com.xiaomi.hm.health&dn=api-user.huami.com%2Capi-mifit.huami.com%2Capp-analytics.huami.com&login_token={login_token}" response = requests.get(url,headers=headers).json() @@ -126,120 +122,234 @@ def get_app_token(login_token): #print(app_token) return app_token -## -def qywx(msg): - server_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}" - re = requests.post(server_url) - jsontxt = json.loads(re.text) - access_token = jsontxt['access_token'] - html = msg.replace('\n', '
') - url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" - data ={"touser" : "@all", - "msgtype" : "mpnews", - "agentid" : "1000002", - "mpnews" : { - "articles" : [ - { - "title" : "小米运动推送", - "content" : html, - "author" : "智能推送助手", - "thumb_media_id": "2GhsYxtOfHM92u0_WK0iNTjxxes7EAH4-GbAgxfc7YyZt17JEiTfHFkFZ4ob9xL7j", - "content_source_url" : "", - "digest" : msg - } - ] - }, - "safe": 0 - } - send_msges=(bytes(json.dumps(data), 'utf-8')) - res = requests.post(url, send_msges) - respon = res.json() #当返回的数据是json串的时候直接用.json即可将respone转换成字典 - - ##print (res.text) - if respon['errmsg'] == "ok": - print(f"推送成功\n") - else: - print(f" 推送失败:鬼知道哪错了\n") - - print("推鬼知道修改成功没") - -## 推送QQ -def push_qq(msg): - """ - 推送消息到QQ酷推 - """ - if key == '': - print("[注意] 未提供key,不进行推送!") - else: - server_url = f"https://push.xuthus.cc/send/{key}?" - params = { - "c": msg - } - - response = requests.get(server_url, params=params) - json_data = response.json() - if json_data['reason'] == "操作成功": - print(f"推送成功") - else: - print(f" 推送失败:鬼知道哪错了") - - print("QQ酷推鬼知道修改成功没") -# 推送server -def push_wx(msg): - """ - 推送消息到微信 - """ - if sckey == '': +# 推送 server 酱 +def push_wx(_sckey, desp=""): + if _sckey == '': print("[注意] 未提供sckey,不进行推送!") else: - server_url = f"https://sc.ftqq.com/{sckey}.send" + server_url = f"https://sc.ftqq.com/{_sckey}.send" params = { "text": '小米运动 步数修改', - "desp": msg + "desp": desp } - + response = requests.get(server_url, params=params) json_data = response.json() - + if json_data['errno'] == 0: print(f"[{now}] 推送成功。") else: print(f"[{now}] 推送失败:{json_data['errno']}({json_data['errmsg']})") -if __name__ == "__main__": - # ServerChan& QQ酷推 - sckey = os.environ.get("sckey") - if str(sckey) == '0': - sckey = '' -## QQ酷推值 - key = os.environ.get("key") - if str(key) == '0': - key = '' + +def push_server(_sckey, desp=""): + if _sckey == '': + print("[注意] 未提供sckey,不进行微信推送!") + else: + server_url = f"https://sctapi.ftqq.com/{_sckey}.send" + params = { + "title": '小米运动 步数修改', + "desp": desp + } + + response = requests.get(server_url, params=params) + json_data = response.json() + + if json_data['code'] == 0: + print(f"[{now}] 推送成功。") + else: + print(f"[{now}] 推送失败:{json_data['code']}({json_data['message']})") + + +# 推送消息到 pushplus +def push_pushplus(token, content=""): + if token == '': + print("[注意] 未提供token,不进行pushplus推送!") + else: + server_url = "http://www.pushplus.plus/send" + params = { + "token": token, + "title": '小米运动 步数修改', + "content": content + } + + response = requests.get(server_url, params=params) + json_data = response.json() + + if json_data['code'] == 200: + print(f"[{now}] 推送成功。") + else: + print(f"[{now}] 推送失败:{json_data['code']}({json_data['message']})") + + +# 推送消息到 TG +def push_tg(token, chat_id, desp=""): + if token == '': + print("[注意] 未提供token,不进行tg推送!") + elif chat_id == '': + print("[注意] 未提供chat_id,不进行tg推送!") + else: + server_url = f"https://api.telegram.org/bot{token}/sendmessage" + params = { + "text": '小米运动 步数修改\n\n' + desp, + "chat_id": chat_id + } + + response = requests.get(server_url, params=params) + json_data = response.json() + + if json_data['ok']: + print(f"[{now}] 推送成功。") + else: + print(f"[{now}] 推送失败:{json_data['error_code']}({json_data['description']})") + + +# 企业微信推送 +def wxpush(msg, usr, corpid, corpsecret, agentid=1000002): + base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?' + req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + corpid = corpid + corpsecret = corpsecret + agentid = agentid + + if agentid == 0: + agentid = 1000002 + + # 获取 access_token,每次的 access_token 都不一样,所以需要运行一次请求一次 + def get_access_token(_base_url, _corpid, _corpsecret): + urls = _base_url + 'corpid=' + _corpid + '&corpsecret=' + _corpsecret + resp = requests.get(urls).json() + access_token = resp['access_token'] + return access_token + + def send_message(_msg, _usr): + data = get_message(_msg, _usr) + req_urls = req_url + get_access_token(base_url, corpid, corpsecret) + res = requests.post(url=req_urls, data=data) + ret = res.json() + if ret["errcode"] == 0: + print(f"[{now}] 企业微信推送成功") + else: + print(f"[{now}] 推送失败:{ret['errcode']} 错误信息:{ret['errmsg']}") + + def get_message(_msg, _usr): + data = { + "touser": _usr, + "toparty": "@all", + "totag": "@all", + "msgtype": "text", + "agentid": agentid, + "text": { + "content": _msg + }, + "safe": 0, + "enable_id_trans": 0, + "enable_duplicate_check": 0, + "duplicate_check_interval": 1800 + } + data = json.dumps(data) + return data + + msg = msg + usr = usr + if corpid == '': + print("[注意] 未提供corpid,不进行企业微信推送!") + elif corpsecret == '': + print("[注意] 未提供corpsecret,不进行企业微信推送!") + else: + send_message(msg, usr) + + +# 推送接口类 处理 pkey 并转发推送消息到推送函数 +class ToPush: + push_msg: str + + def __init__(self, _pkey): + self.pkey = _pkey + + # 推送 server 酱接口 + def to_push_wx(self): + if str(self.pkey) == '0': + self.pkey = '' + push_wx(self.pkey, self.push_msg) + + # 推送消息到微信接口 + def to_push_server(self): + if str(self.pkey) == '0': + self.pkey = '' + push_server(self.pkey, self.push_msg) + + # 推送消息到 TG 接口 + def to_push_tg(self): + try: + token, chat_id = self.pkey.split('@') + push_tg(token, chat_id, self.push_msg) + except ValueError: + print('tg推送参数有误!') + + # 企业微信推送接口 + def to_wxpush(self): + try: + usr, corpid, corpsecret, *agentid = self.pkey.split('-') + if agentid: + wxpush(self.push_msg, usr, corpid, corpsecret, int(agentid[0])) + else: + wxpush(self.push_msg, usr, corpid, corpsecret) + except ValueError: + print('企业微信推送参数有误!') + + def to_push_pushplus(self): + if self.pkey == '': + print('pushplus token错误') + else: + push_pushplus(self.pkey, self.push_msg) + + @staticmethod + def no_push(): + print('不推送') + + +if __name__ == "__main__": + # Push Mode + Pm = os.environ.get("PMODE") + pkey = os.environ.get("PKEY") + + to_push = ToPush(pkey) + # 用户名(格式为 13800138000) - user = os.environ.get("user") + user = os.environ.get("MI_USER") # 登录密码 - passwd = os.environ.get("pwd") - # 要修改的步数,直接输入想要修改的步数值,留空为随机步数 - step = os.environ.get("step") - corpid = os.environ.get("corpid") - corpsecret = os.environ.get("corpsecret") + passwd = os.environ.get("MI_PWD") + # 要修改的步数,直接输入想要修改的步数值,0为随机步数 + step = os.environ.get("STEP").replace('[', '').replace(']', '') user_list = user.split('#') passwd_list = passwd.split('#') - setp_array = step.split('-') + step_array = step.split('-') if len(user_list) == len(passwd_list): - if user == '': - print("啥也没有,不执行,快去填写账号密码") - else: - push = '' - for line in range(0,len(user_list)): - if len(setp_array) == 2: - step = str(random.randint(int(setp_array[0]),int(setp_array[1]))) - elif str(step) == '0': - step = '' - push += main(user_list[line], passwd_list[line], step) + '\n' - send.send(push) + push_msg = '' + for user, passwd in zip(user_list, passwd_list): + if len(step_array) == 2: + step = str(random.randint(int(step_array[0]), int(step_array[1]))) + print(f"已设置为随机步数({step_array[0]}-{step_array[1]})") + elif str(step) == '0': + step = '' + push_msg += main(user, passwd, step) + '\n' + + push = { + 'wx': to_push.to_push_wx, + 'nwx': to_push.to_push_server, + 'tg': to_push.to_push_tg, + 'qwx': to_push.to_wxpush, + 'pp': to_push.to_push_pushplus, + 'off': to_push.no_push + } + try: + push[Pm]() + except KeyError: + print('推送选项有误!') + exit(0) else: - print('用户名和密码数量不对') + print('用户名和密码数量不对') \ No newline at end of file