QL_TimingScript/WeChatPublicNumberPushInformation.py
2024-09-14 16:11:53 +08:00

326 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding=UTF-8 -*-
# @Project QL_TimingScript
# @fileName WeChatPublicNumberPushInformation.py
# @author Echo
# @EditTime 2024/9/14
import datetime
import random
import time
import httpx
from typing import Text, Optional, List, Dict
"""
公众号推送模版:
📆今天是: {{today.DATA}}
👩‍❤️‍💋‍👨今天是我们恋爱的第{{ld.DATA}}天
☁️今天的天气状况: {{w.DATA}}
🌡️当前气温: {{c_temp.DATA}}℃
❄️最低气温: {{min_temp.DATA}}℃
☀️最高气温: {{max_temp.DATA}}℃
🎂距离{{u.DATA}}生日还有: {{b.DATA}}天
📝距离考研还剩: {{exa_day.DATA}}天
❤️‍爱情指数: {{l_i.DATA}}
💼工作指数: {{w_i.DATA}}
🪙财运指数: {{f_i.DATA}}
🌟星座运势: {{sign1.DATA}}{{sign2.DATA}}{{sign3.DATA}}{{sign4.DATA}}{{sign5.DATA}}
🌈今日彩虹屁: {{copy1.DATA}}{{copy2.DATA}}{{copy3.DATA}}{{copy4.DATA}}{{copy5.DATA}}
"""
"""
设置配置常量
"""
API_KEY = "" # 天聚数行密钥
APP_ID = "" # 微信公众号appid
APP_SECRET = "" # 微信公众号appsecret
WECHAT_USER_IDS = [] # 要推送的微信用户ID
TEMPLATE_ID = "" # 模板ID
CITY_NAME = "" # 城市
AREA = "" # 区县
EXAMINATION_DATE = "" # 考研日期
USER = "" # 对象称呼
BIRTHDAY = "" # 对象生日
STAR_SIGN = "" # 对象星座
LOVE_DATE = "" # 恋爱开始日期
# if 'tian_api_key' in os.environ:
# api_key = re.split("@|&", os.environ.get("tian_api_key"))
# else:
# api_key = []
# print("未查找到tian_api_key变量.")
def time_diff(time1: Text, time2: Text, format) -> int:
"""
计算两个时间差
:param time1: 时间1
:param time2: 时间2
:param format: 时间格式
:return: 时间差
"""
import datetime
time1 = datetime.datetime.strptime(time1, format)
time2 = datetime.datetime.strptime(time2, format)
if time2 > time1:
return (time2 - time1).days
else:
print("时间1大于时间2, 请检查")
def calculate_birthday(birthday: Text) -> int:
"""
计算生日
:param birthday: 出生日期
:return: 生日
"""
from datetime import date
today = date.today()
# 解析生日字符串
birthday_month, birthday_day = map(int, birthday.split('-'))
# 计算今年的生日日期
this_year_birthday = date(today.year, birthday_month, birthday_day)
# 如果今年的生日已经过了,就计算明年的生日
if this_year_birthday < today:
next_birthday = date(today.year + 1, birthday_month, birthday_day)
else:
next_birthday = this_year_birthday
# 计算天数差
days_left = (next_birthday - today).days
return days_left
def claculate_love_date(love_date: Text) -> int:
"""
计算恋爱日
:param love_date:
:return:
"""
from datetime import date, datetime
anniversary_date = datetime.strptime(love_date, "%Y-%m-%d").date()
today = date.today()
days_passed = (today - anniversary_date).days
return days_passed
def claculate_exam_countdown(exam_date: Text) -> int:
"""
计算考试倒计时
:param exam_date: 考试时间
:return:
"""
from datetime import date, datetime
today = date.today()
exam_date = datetime.strptime(exam_date, "%m-%d").date()
exam_date = exam_date.replace(year=today.year)
# if exam_date < today:
# exam_date = exam_date.replace(year=today.year + 1)
days_left = (exam_date - today).days
return days_left
class TianApi:
def __init__(self):
self.api_key = API_KEY
self.client = httpx.Client(base_url="https://apis.tianapi.com")
def get_rainbowFart(self):
"""
获取彩虹屁
:return:
"""
response = self.client.get(
url=f"/caihongpi/index?key={self.api_key}"
).json()
if response.get("msg") == "success":
return response["result"]["content"]
def get_horoscope(self, star_sign: Text, date: Optional[Text] = None) -> List:
"""
获取星座运势
:param star_sign: 星座名(必填)
:param date: 出生日期(可选)
:return:
"""
if date is None:
url = f"/star/index?key={self.api_key}&astro={star_sign}"
else:
url = f"/star/index?key={self.api_key}&astro={star_sign}&date={date}"
response = self.client.get(
url=url
).json()
return response["result"]["list"]
def get_weather_infos(self, city: Text) -> Dict:
"""
获取天气
:param city: 城市城市天气ID、行政代码、城市名称、IP地址
:return:
"""
response = self.client.get(
url=f"/tianqi/index?key={self.api_key}&city={city}&type=1"
).json()
return response["result"]
class WeChatPushMessage:
def __init__(self):
self.client = httpx.Client()
self.tian_api = TianApi()
@staticmethod
def get_color():
# 获取随机颜色
get_colors = lambda n: list(map(lambda i: "#" + "%06x" % random.randint(0, 0xFFFFFF), range(n)))
color_list = get_colors(100)
return random.choice(color_list)
def send_email(self, subject, contents):
"""
发送邮件
:param subject: 邮件主题
:param message: 邮件内容
:return:
"""
import yagmail
# 配置
sender_name = "微信消息推送"
sender_email = "liulong3men@163.com"
sender_password = "XUJFCMOZOXZUWMTT"
receiver_email = "1873190160@qq.com"
# 发送邮件
try:
# 创建 yagmail 客户端
yag = yagmail.SMTP({sender_email: sender_name}, password=sender_password, host="smtp.163.com", port=465)
# 发送邮件
yag.send(
to=receiver_email,
subject=subject,
contents=contents
)
print("邮件发送成功!")
except Exception as e:
print(f"邮件发送失败:{str(e)}")
@staticmethod
def split_str(str_: Text, length: int = 20):
chunks = [str_[i:i + length] for i in range(0, len(str_), length)]
result = {}
for i, chunk in enumerate(chunks, 1):
var_name = f"content_{i}"
result[var_name] = chunk
return result
def __get_access_token(self) -> Text:
appid = APP_ID
appsecret = APP_SECRET
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={appid}&secret={appsecret}"
try:
response = self.client.get(url).json()
return response["access_token"]
except KeyError:
raise KeyError("获取access_token失败检查appid和appsecret是否正确.")
def push_message(self, user):
# city = config.get("cityName") + config.get("area")
week_list = ["星期日", "星期一", "星期二", "星期三", "星期四", "星期五", "星期六"]
time_ = time.strftime('%Y年%m月%d%H:%M:%S' + week_list[datetime.date.today().isoweekday() % 7] + '',
time.localtime((int(time.mktime(time.gmtime())) + 28800))) # 获取当前时间
birthday = calculate_birthday(BIRTHDAY) # 计算生日
love_date = claculate_love_date(LOVE_DATE) # 计算恋爱日
exam_date = claculate_exam_countdown(EXAMINATION_DATE) # 计算考研剩余时间
weather_info = self.tian_api.get_weather_infos(AREA)
weather_condition = weather_info.get("weather") # 天气状况
max_temperature = weather_info.get("highest") # 最高温度
current_temperature = weather_info.get("real") # 实时温度
min_temperature = weather_info.get("lowest") # 最低温度
star_sign = STAR_SIGN # 星座
star_sign_info = self.tian_api.get_horoscope(star_sign)
composite_index = star_sign_info[0]["content"] # 综合指数
love_index = star_sign_info[1]["content"] # 恋爱指数
work_index = star_sign_info[2]["content"] # 工作指数
fortune_index = star_sign_info[3]["content"] # 财运指数
health_index = star_sign_info[4]["content"] # 健康指数
speed_sign = star_sign_info[7]["content"] # 速配星座
url = f"https://api.weixin.qq.com/cgi-bin/message/template/send?access_token={self.__get_access_token()}"
messages = {
"touser": user,
"template_id": TEMPLATE_ID,
"url": "http://www.baidu.com",
"topcolor": "#FF0000",
"data": {
"today": {"value": time_, "color": self.get_color()}, # 当前时间
"ld": {"value": love_date, "color": "#EC2417"}, # 恋爱日
# "city": {"value": city, "color": self.get_color()}, # 城市
"w": {"value": weather_condition, "color": self.get_color()}, # 天气状况
"c_temp": {"value": current_temperature, "color": self.get_color()}, # 实时温度
"min_temp": {"value": min_temperature, "color": self.get_color()}, # 最低温度
"max_temp": {"value": max_temperature, "color": self.get_color()}, # 最高温度
"u": {"value": USER, "color": self.get_color()}, # 用户
"b": {"value": birthday, "color": self.get_color()}, # 生日
"exa_day": {"value": exam_date, "color": self.get_color()}, # 考研剩余时间
# "c_i": {"value": composite_index, "color": self.get_color()}, # 综合指数
"l_i": {"value": love_index, "color": self.get_color()}, # 恋爱指数
"w_i": {"value": work_index, "color": self.get_color()}, # 工作指数
"f_i": {"value": fortune_index, "color": self.get_color()} # 财运指数
# "h_i": {"value": health_index, "color": self.get_color()}, # 健康指数
# "s_s": {"value": speed_sign, "color": self.get_color()}, # 速配星座
}
}
# tips_info = weather_info.get("tips") # 出行建议
# split_tips = self.split_str(tips_info)
# tips_fields = ["tips1", "tips2", "tips3", "tips4", "tips5"]
# for i, field in enumerate(tips_fields, 1):
# if f"content_{i}" in split_tips:
# messages["data"][field] = {"value": split_tips[f"content_{i}"], "color": self.get_color()}
today_summarize_info = star_sign_info[8]["content"] # 今日运势
split_today_summarize_info = self.split_str(today_summarize_info)
today_summarize_fields = ["sign1", "sign2", "sign3", "sign4", "sign5"]
for i, field in enumerate(today_summarize_fields, 1):
if f"content_{i}" in split_today_summarize_info:
messages["data"][field] = {"value": split_today_summarize_info[f"content_{i}"],
"color": self.get_color()}
rainbow_fart_info = self.tian_api.get_rainbowFart()
if "XXX" in rainbow_fart_info:
rainbow_fart_info = rainbow_fart_info.replace("XXX", USER)
split_rainbow_fart_info = self.split_str(rainbow_fart_info)
rainbow_fart_fields = ["copy1", "copy2", "copy3", "copy4", "copy5"]
for i, field in enumerate(rainbow_fart_fields, 1):
if f"content_{i}" in split_rainbow_fart_info:
messages["data"][field] = {"value": split_rainbow_fart_info[f"content_{i}"], "color": self.get_color()}
try:
response = self.client.post(url, json=messages).json()
if response.get("errcode") == 0 and response.get("errmsg") == "ok":
print("推送成功")
# self.send_email(subject=f"🗨️微信公众号通知 - {datetime.datetime.now().strftime("%Y/%m/%d")}",
# contents=f"⏰向【{config.get('user')}】早安信息已成功推送啦!")
else:
print("推送失败", response)
# self.send_email(subject=f"🗨️微信公众号通知 - {datetime.datetime.now().strftime("%Y/%m/%d")}",
# contents=f"❌向【{config.get('user')}】早安信息推送失败,请前往青龙面板查看日志!")
except KeyError:
raise KeyError("推送失败,请检查参数是否正确")
if __name__ == '__main__':
for user in WECHAT_USER_IDS:
wpm = WeChatPushMessage()
wpm.push_message(user)
del wpm