增加钉钉异步推送

This commit is contained in:
echo 2024-09-23 21:23:27 +08:00
parent 5d17033700
commit e76e1e626d

View File

@ -477,6 +477,30 @@ def send_notification_message(title, content):
dingding_bot(title, content)
except Exception as e:
print("发送通知消息失败")
async def aysnc_send_notification_message(title, content):
try:
timestamp = str(round(time.time() * 1000)) # 时间戳
secret_enc = DD_BOT_SECRET.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, DD_BOT_SECRET)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 签名
print('开始使用 钉钉机器人 推送消息...', end='')
url = f'https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_TOKEN}&timestamp={timestamp}&sign={sign}'
headers = {'Content-Type': 'application/json;charset=utf-8'}
data = {
'msgtype': 'text',
'text': {'content': f'{title}\n\n{content}'}
}
response = requests.post(url=url, data=json.dumps(data), headers=headers, timeout=15).json()
if not response['errcode']:
print('推送成功!')
else:
print('推送失败!')
except Exception as e:
print("钉钉消息推送失败")
if __name__ == '__main__':