This commit is contained in:
Leon 2021-08-10 16:06:26 +08:00
parent fd10a3d759
commit fce65827d7

View File

@ -12,14 +12,13 @@ headers = {
'User-Agent': 'Dalvik/2.1.0 (Linux; U; Android 9; MI 6 MIUI/20.6.18)'
}
#获取登录code
# 获取登录 code
def get_code(location):
code_pattern = re.compile("(?<=access=).*?(?=&)")
code = code_pattern.findall(location)[0]
return code
#登录
# 登录
def login(user,password):
url1 = "https://api-user.huami.com/registrations/+86" + user + "/tokens"
headers = {
@ -62,18 +61,18 @@ def login(user,password):
return login_token,userid
#主函数
# 主函数
def main(user, passwd, step):
user = str(user)
password = str(passwd)
step = str(step)
if user == '' or password == '':
print ("用户名或密码填写有误")
return
print("用户名或密码不能为空")
return "user and passwd not empty"
if step == '':
print ("已设置为随机步数(20000-29999")
step = str(random.randint(20000,21999))
print ("已设置为随机步数(18000-25000")
step = str(random.randint(18000,25000))
login_token = 0
login_token,userid = login(user,password)
if login_token == 0:
@ -103,21 +102,18 @@ def main(user, passwd, step):
response = requests.post(url, data=data, headers=head).json()
#print(response)
result = f"[{now}] \n{user[:3]}****{user[-3:]}: \n小米运动修改步数({step}"+ response['message']
result = f"{user[:4]}****{user[-4:]}: [{now}] 修改步数({step}"+ response['message']
print(result)
# fo = open("foo.txt", "a")
# fo.write(result)
# fo.close()
return result
#获取时间戳
# 获取时间戳
def get_time():
url = 'http://api.m.taobao.com/rest/api3.do?api=mtop.common.getTimestamp'
response = requests.get(url,headers=headers).json()
t = response['data']['t']
return t
#获取app_token
# 获取app_token
def get_app_token(login_token):
url = f"https://account-cn.huami.com/v1/client/app_tokens?app_name=com.xiaomi.hm.health&dn=api-user.huami.com%2Capi-mifit.huami.com%2Capp-analytics.huami.com&login_token={login_token}"
response = requests.get(url,headers=headers).json()
@ -126,120 +122,234 @@ def get_app_token(login_token):
#print(app_token)
return app_token
##
def qywx(msg):
server_url = f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}"
re = requests.post(server_url)
jsontxt = json.loads(re.text)
access_token = jsontxt['access_token']
html = msg.replace('\n', '<br>')
url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
data ={"touser" : "@all",
"msgtype" : "mpnews",
"agentid" : "1000002",
"mpnews" : {
"articles" : [
{
"title" : "小米运动推送",
"content" : html,
"author" : "智能推送助手",
"thumb_media_id": "2GhsYxtOfHM92u0_WK0iNTjxxes7EAH4-GbAgxfc7YyZt17JEiTfHFkFZ4ob9xL7j",
"content_source_url" : "",
"digest" : msg
}
]
},
"safe": 0
}
send_msges=(bytes(json.dumps(data), 'utf-8'))
res = requests.post(url, send_msges)
respon = res.json() #当返回的数据是json串的时候直接用.json即可将respone转换成字典
##print (res.text)
if respon['errmsg'] == "ok":
print(f"推送成功\n")
else:
print(f" 推送失败:鬼知道哪错了\n")
print("推鬼知道修改成功没")
## 推送QQ
def push_qq(msg):
"""
推送消息到QQ酷推
"""
if key == '':
print("[注意] 未提供key不进行推送")
else:
server_url = f"https://push.xuthus.cc/send/{key}?"
params = {
"c": msg
}
response = requests.get(server_url, params=params)
json_data = response.json()
if json_data['reason'] == "操作成功":
print(f"推送成功")
else:
print(f" 推送失败:鬼知道哪错了")
print("QQ酷推鬼知道修改成功没")
# 推送server
def push_wx(msg):
"""
推送消息到微信
"""
if sckey == '':
# 推送 server 酱
def push_wx(_sckey, desp=""):
if _sckey == '':
print("[注意] 未提供sckey不进行推送")
else:
server_url = f"https://sc.ftqq.com/{sckey}.send"
server_url = f"https://sc.ftqq.com/{_sckey}.send"
params = {
"text": '小米运动 步数修改',
"desp": msg
"desp": desp
}
response = requests.get(server_url, params=params)
json_data = response.json()
if json_data['errno'] == 0:
print(f"[{now}] 推送成功。")
else:
print(f"[{now}] 推送失败:{json_data['errno']}({json_data['errmsg']})")
if __name__ == "__main__":
# ServerChan& QQ酷推
sckey = os.environ.get("sckey")
if str(sckey) == '0':
sckey = ''
## QQ酷推值
key = os.environ.get("key")
if str(key) == '0':
key = ''
def push_server(_sckey, desp=""):
if _sckey == '':
print("[注意] 未提供sckey不进行微信推送")
else:
server_url = f"https://sctapi.ftqq.com/{_sckey}.send"
params = {
"title": '小米运动 步数修改',
"desp": desp
}
response = requests.get(server_url, params=params)
json_data = response.json()
if json_data['code'] == 0:
print(f"[{now}] 推送成功。")
else:
print(f"[{now}] 推送失败:{json_data['code']}({json_data['message']})")
# 推送消息到 pushplus
def push_pushplus(token, content=""):
if token == '':
print("[注意] 未提供token不进行pushplus推送")
else:
server_url = "http://www.pushplus.plus/send"
params = {
"token": token,
"title": '小米运动 步数修改',
"content": content
}
response = requests.get(server_url, params=params)
json_data = response.json()
if json_data['code'] == 200:
print(f"[{now}] 推送成功。")
else:
print(f"[{now}] 推送失败:{json_data['code']}({json_data['message']})")
# 推送消息到 TG
def push_tg(token, chat_id, desp=""):
if token == '':
print("[注意] 未提供token不进行tg推送")
elif chat_id == '':
print("[注意] 未提供chat_id不进行tg推送")
else:
server_url = f"https://api.telegram.org/bot{token}/sendmessage"
params = {
"text": '小米运动 步数修改\n\n' + desp,
"chat_id": chat_id
}
response = requests.get(server_url, params=params)
json_data = response.json()
if json_data['ok']:
print(f"[{now}] 推送成功。")
else:
print(f"[{now}] 推送失败:{json_data['error_code']}({json_data['description']})")
# 企业微信推送
def wxpush(msg, usr, corpid, corpsecret, agentid=1000002):
base_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?'
req_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
corpid = corpid
corpsecret = corpsecret
agentid = agentid
if agentid == 0:
agentid = 1000002
# 获取 access_token每次的 access_token 都不一样,所以需要运行一次请求一次
def get_access_token(_base_url, _corpid, _corpsecret):
urls = _base_url + 'corpid=' + _corpid + '&corpsecret=' + _corpsecret
resp = requests.get(urls).json()
access_token = resp['access_token']
return access_token
def send_message(_msg, _usr):
data = get_message(_msg, _usr)
req_urls = req_url + get_access_token(base_url, corpid, corpsecret)
res = requests.post(url=req_urls, data=data)
ret = res.json()
if ret["errcode"] == 0:
print(f"[{now}] 企业微信推送成功")
else:
print(f"[{now}] 推送失败:{ret['errcode']} 错误信息:{ret['errmsg']}")
def get_message(_msg, _usr):
data = {
"touser": _usr,
"toparty": "@all",
"totag": "@all",
"msgtype": "text",
"agentid": agentid,
"text": {
"content": _msg
},
"safe": 0,
"enable_id_trans": 0,
"enable_duplicate_check": 0,
"duplicate_check_interval": 1800
}
data = json.dumps(data)
return data
msg = msg
usr = usr
if corpid == '':
print("[注意] 未提供corpid不进行企业微信推送")
elif corpsecret == '':
print("[注意] 未提供corpsecret不进行企业微信推送")
else:
send_message(msg, usr)
# 推送接口类 处理 pkey 并转发推送消息到推送函数
class ToPush:
push_msg: str
def __init__(self, _pkey):
self.pkey = _pkey
# 推送 server 酱接口
def to_push_wx(self):
if str(self.pkey) == '0':
self.pkey = ''
push_wx(self.pkey, self.push_msg)
# 推送消息到微信接口
def to_push_server(self):
if str(self.pkey) == '0':
self.pkey = ''
push_server(self.pkey, self.push_msg)
# 推送消息到 TG 接口
def to_push_tg(self):
try:
token, chat_id = self.pkey.split('@')
push_tg(token, chat_id, self.push_msg)
except ValueError:
print('tg推送参数有误')
# 企业微信推送接口
def to_wxpush(self):
try:
usr, corpid, corpsecret, *agentid = self.pkey.split('-')
if agentid:
wxpush(self.push_msg, usr, corpid, corpsecret, int(agentid[0]))
else:
wxpush(self.push_msg, usr, corpid, corpsecret)
except ValueError:
print('企业微信推送参数有误!')
def to_push_pushplus(self):
if self.pkey == '':
print('pushplus token错误')
else:
push_pushplus(self.pkey, self.push_msg)
@staticmethod
def no_push():
print('不推送')
if __name__ == "__main__":
# Push Mode
Pm = os.environ.get("PMODE")
pkey = os.environ.get("PKEY")
to_push = ToPush(pkey)
# 用户名(格式为 13800138000
user = os.environ.get("user")
user = os.environ.get("MI_USER")
# 登录密码
passwd = os.environ.get("pwd")
# 要修改的步数,直接输入想要修改的步数值,留空为随机步数
step = os.environ.get("step")
corpid = os.environ.get("corpid")
corpsecret = os.environ.get("corpsecret")
passwd = os.environ.get("MI_PWD")
# 要修改的步数直接输入想要修改的步数值0为随机步数
step = os.environ.get("STEP").replace('[', '').replace(']', '')
user_list = user.split('#')
passwd_list = passwd.split('#')
setp_array = step.split('-')
step_array = step.split('-')
if len(user_list) == len(passwd_list):
if user == '':
print("啥也没有,不执行,快去填写账号密码")
else:
push = ''
for line in range(0,len(user_list)):
if len(setp_array) == 2:
step = str(random.randint(int(setp_array[0]),int(setp_array[1])))
elif str(step) == '0':
step = ''
push += main(user_list[line], passwd_list[line], step) + '\n'
send.send(push)
push_msg = ''
for user, passwd in zip(user_list, passwd_list):
if len(step_array) == 2:
step = str(random.randint(int(step_array[0]), int(step_array[1])))
print(f"已设置为随机步数({step_array[0]}-{step_array[1]}")
elif str(step) == '0':
step = ''
push_msg += main(user, passwd, step) + '\n'
push = {
'wx': to_push.to_push_wx,
'nwx': to_push.to_push_server,
'tg': to_push.to_push_tg,
'qwx': to_push.to_wxpush,
'pp': to_push.to_push_pushplus,
'off': to_push.no_push
}
try:
push[Pm]()
except KeyError:
print('推送选项有误!')
exit(0)
else:
print('用户名和密码数量不对')
print('用户名和密码数量不对')