From a0d030d14fee2f3c2e688cf98172411dd08702f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B4=9B?= Date: Fri, 23 Jul 2021 16:32:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=8E=A8=E9=80=81=E6=96=B9?= =?UTF-8?q?=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- checksendNotify.py | 429 ++++++++++++++++++++++++++++++++++++++++++--- wzyd.py | 3 +- 2 files changed, 403 insertions(+), 29 deletions(-) diff --git a/checksendNotify.py b/checksendNotify.py index 8efed24..16b7616 100644 --- a/checksendNotify.py +++ b/checksendNotify.py @@ -1,34 +1,407 @@ +#!/usr/bin/env python3 +# _*_ coding:utf-8 _*_ + +import sys +import os, re + +cur_path = os.path.abspath(os.path.dirname(__file__)) +root_path = os.path.split(cur_path)[0] +sys.path.append(root_path) import requests -import os +import json +import time +import hmac +import hashlib +import base64 +import urllib.parse +from requests.adapters import HTTPAdapter +from urllib3.util import Retry + +# 通知服务 +BARK = '' # bark服务,自行搜索;,此参数如果以http或者https开头则判定为自建bark服务;secrets可填; +SCKEY = '' # Server酱的SCKEY; secrets可填 +TG_BOT_TOKEN = '' # tg机器人的TG_BOT_TOKEN; secrets可填1407203283:AAG9rt-6RDaaX0HBLZQq0laNOh898iFYaRQ +TG_USER_ID = '' # tg机器人的TG_USER_ID; secrets可填 1434078534 +TG_API_HOST = '' # tg 代理api +TG_PROXY_IP = '' # tg机器人的TG_PROXY_IP; secrets可填 +TG_PROXY_PORT = '' # tg机器人的TG_PROXY_PORT; secrets可填 +DD_BOT_ACCESS_TOKEN = '' # 钉钉机器人的DD_BOT_ACCESS_TOKEN; secrets可填 +DD_BOT_SECRET = '' # 钉钉机器人的DD_BOT_SECRET; secrets可填 +QQ_SKEY = '' # qq机器人的QQ_SKEY; secrets可填 +QQ_MODE = '' # qq机器人的QQ_MODE; secrets可填 +QYWX_AM = '' # 企业微信 +PUSH_PLUS_TOKEN = '' # 微信推送Plus+ +GOBOT_URL = "" # go-cqhttp 例如:推送到个人QQ: http://127.0.0.1/send_private_msg 群:http://127.0.0.1/send_group_msg +GOBOT_TOKEN = "" # go-cqhttp的access_token 可不填 +GOBOT_QQ = "" # go-cqhttp的推送群或者用户 GOBOT_URL设置 /send_private_msg 则需要填入 user_id=个人QQ 相反如果是 /send_group_msg 则需要填入 group_id=QQ群 +notify_mode = [] + +message_info = '''''' + +# GitHub action运行需要填写对应的secrets +if "BARK" in os.environ and os.environ["BARK"]: + BARK = os.environ["BARK"] +if "SCKEY" in os.environ and os.environ["SCKEY"]: + SCKEY = os.environ["SCKEY"] +if "TG_BOT_TOKEN" in os.environ and os.environ["TG_BOT_TOKEN"] and "TG_USER_ID" in os.environ and os.environ[ + "TG_USER_ID"]: + TG_BOT_TOKEN = os.environ["TG_BOT_TOKEN"] + TG_USER_ID = os.environ["TG_USER_ID"] +if "TG_API_HOST" in os.environ and os.environ["TG_API_HOST"]: + TG_API_HOST = os.environ["TG_API_HOST"] +if "DD_BOT_ACCESS_TOKEN" in os.environ and os.environ["DD_BOT_ACCESS_TOKEN"] and "DD_BOT_SECRET" in os.environ and \ + os.environ["DD_BOT_SECRET"]: + DD_BOT_ACCESS_TOKEN = os.environ["DD_BOT_ACCESS_TOKEN"] + DD_BOT_SECRET = os.environ["DD_BOT_SECRET"] +if "QQ_SKEY" in os.environ and os.environ["QQ_SKEY"] and "QQ_MODE" in os.environ and os.environ["QQ_MODE"]: + QQ_SKEY = os.environ["QQ_SKEY"] + QQ_MODE = os.environ["QQ_MODE"] +# 获取pushplus+ PUSH_PLUS_TOKEN +if "PUSH_PLUS_TOKEN" in os.environ: + if len(os.environ["PUSH_PLUS_TOKEN"]) > 1: + PUSH_PLUS_TOKEN = os.environ["PUSH_PLUS_TOKEN"] + # print("已获取并使用Env环境 PUSH_PLUS_TOKEN") +# 获取企业微信应用推送 QYWX_AM +if "QYWX_AM" in os.environ: + if len(os.environ["QYWX_AM"]) > 1: + QYWX_AM = os.environ["QYWX_AM"] + # print("已获取并使用Env环境 QYWX_AM") +# 获取go-cqhttp +if "GOBOT_URL" in os.environ and os.environ["GOBOT_URL"]: + GOBOT_URL = os.environ["GOBOT_URL"] + GOBOT_TOKEN = os.environ["GOBOT_TOKEN"] + GOBOT_QQ = os.environ["GOBOT_QQ"] + +if BARK: + notify_mode.append('bark') + # print("BARK 推送打开") +if SCKEY: + notify_mode.append('sc_key') + # print("Server酱 推送打开") +if TG_BOT_TOKEN and TG_USER_ID: + notify_mode.append('telegram_bot') + # print("Telegram 推送打开") +if DD_BOT_ACCESS_TOKEN and DD_BOT_SECRET: + notify_mode.append('dingding_bot') + # print("钉钉机器人 推送打开") +if QQ_SKEY and QQ_MODE: + notify_mode.append('coolpush_bot') + # print("QQ机器人 推送打开") +if GOBOT_URL and GOBOT_QQ: + notify_mode.append('go_cqhttp') + # print("go-cqhttp机器人 推送打开") +if PUSH_PLUS_TOKEN: + notify_mode.append('pushplus_bot') + # print("微信推送Plus机器人 推送打开") +if QYWX_AM: + notify_mode.append('wecom_app') + # print("企业微信机器人 推送打开") -def message2telegram(tg_api_host, tg_proxy, tg_bot_token, tg_user_id, content): - print("Telegram 推送开始") - send_data = {"chat_id": tg_user_id, "text": content, "disable_web_page_preview": "true"} - if tg_api_host: - url = f"https://{tg_api_host}/bot{tg_bot_token}/sendMessage" +def message(str_msg): + global message_info + print(str_msg) + message_info = "{}\n{}".format(message_info, str_msg) + sys.stdout.flush() + + +def bark(title, content): + print("\n") + if not BARK: + print("bark服务的bark_token未设置!!\n取消推送") + return + print("bark服务启动") + url = None + if BARK.startswith('http'): + url = f"""{BARK}/{title}/{content}""" else: - url = f"https://api.telegram.org/bot{tg_bot_token}/sendMessage" - if tg_proxy: - proxies = { - "http": tg_proxy, - "https": tg_proxy, - } + url = f"""https://api.day.app/{BARK}/{title}/{content}""" + response = requests.get(url).json() + if response['code'] == 200: + print('推送成功!') else: + print('推送失败!') + + +# go-cqhttp +def go_cqhttp(title, content): + print("\n") + if not GOBOT_URL or not GOBOT_QQ: + print("go-cqhttp服务的GOBOT_URL或GOBOT_QQ未设置!!\n取消推送") + return + print("go-cqhttp服务启动") + url = f"""{GOBOT_URL}?access_token={GOBOT_TOKEN}&{GOBOT_QQ}&message=标题:{title}\n内容:{content}""" + response = requests.get(url).json() + if response['status'] == 'ok': + print('推送成功!') + else: + print('推送失败!') + + +def serverJ(title, content): + print("\n") + if not SCKEY: + print("server酱服务的SCKEY未设置!!\n取消推送") + return + print("serverJ服务启动") + data = { + "text": title, + "desp": content.replace("\n", "\n\n") + } + response = requests.post(f"https://sc.ftqq.com/{SCKEY}.send", data=data).json() + if response['errno'] == 0: + print('推送成功!') + else: + print('推送失败!') + + +# tg通知 +def telegram_bot(title, content): + try: + print("\n") + bot_token = TG_BOT_TOKEN + user_id = TG_USER_ID + if not bot_token or not user_id: + print("tg服务的bot_token或者user_id未设置!!\n取消推送") + return + print("tg服务启动") + if TG_API_HOST: + url = f"https://{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage" + else: + url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage" + + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + payload = {'chat_id': str(TG_USER_ID), 'text': f'{title}\n\n{content}', 'disable_web_page_preview': 'true'} proxies = None - try : - requests.post(url=url, data=send_data, proxies=proxies) - print("推送成功") - return 1 - except: - print("推送失败") - return 0 - - -def send(content): - tg_api_host = os.environ.get('TG_API_HOST') - tg_proxy = '' - tg_bot_token = os.environ.get('TG_BOT_TOKEN') - tg_user_id = os.environ.get('TG_USER_ID') - message2telegram(tg_api_host=tg_api_host, tg_proxy=tg_proxy, tg_bot_token=tg_bot_token, tg_user_id=tg_user_id, - content=content) \ No newline at end of file + if TG_PROXY_IP and TG_PROXY_PORT: + proxyStr = "http://{}:{}".format(TG_PROXY_IP, TG_PROXY_PORT) + proxies = {"http": proxyStr, "https": proxyStr} + try: + response = requests.post(url=url, headers=headers, params=payload, proxies=proxies).json() + except: + print('推送失败!') + if response['ok']: + print('推送成功!') + else: + print('推送失败!') + except Exception as e: + print(e) + + +def dingding_bot(title, content): + timestamp = str(round(time.time() * 1000)) # 时间戳 + secret_enc = DD_BOT_SECRET.encode('utf-8') + string_to_sign = '{}\n{}'.format(timestamp, DD_BOT_SECRET) + string_to_sign_enc = string_to_sign.encode('utf-8') + hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() + sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 签名 + print('开始使用 钉钉机器人 推送消息...', end='') + url = f'https://oapi.dingtalk.com/robot/send?access_token={DD_BOT_ACCESS_TOKEN}×tamp={timestamp}&sign={sign}' + headers = {'Content-Type': 'application/json;charset=utf-8'} + data = { + 'msgtype': 'text', + 'text': {'content': f'{title}\n\n{content}'} + } + response = requests.post(url=url, data=json.dumps(data), headers=headers, timeout=15).json() + if not response['errcode']: + print('推送成功!') + else: + print('推送失败!') + + +def coolpush_bot(title, content): + print("\n") + if not QQ_SKEY or not QQ_MODE: + print("qq服务的QQ_SKEY或者QQ_MODE未设置!!\n取消推送") + return + print("qq服务启动") + url = f"https://qmsg.zendee.cn/{QQ_MODE}/{QQ_SKEY}" + payload = {'msg': f"{title}\n\n{content}".encode('utf-8')} + response = requests.post(url=url, params=payload).json() + if response['code'] == 0: + print('推送成功!') + else: + print('推送失败!') + + +# push推送 +def pushplus_bot(title, content): + try: + print("\n") + if not PUSH_PLUS_TOKEN: + print("PUSHPLUS服务的token未设置!!\n取消推送") + return + print("PUSHPLUS服务启动") + url = 'http://www.pushplus.plus/send' + data = { + "token": PUSH_PLUS_TOKEN, + "title": title, + "content": content + } + body = json.dumps(data).encode(encoding='utf-8') + headers = {'Content-Type': 'application/json'} + response = requests.post(url=url, data=body, headers=headers).json() + if response['code'] == 200: + print('推送成功!') + else: + print('推送失败!') + except Exception as e: + print(e) + + +# 企业微信 APP 推送 +def wecom_app(title, content): + try: + if not QYWX_AM: + print("QYWX_AM 并未设置!!\n取消推送") + return + QYWX_AM_AY = re.split(',', QYWX_AM) + if 4 < len(QYWX_AM_AY) > 5: + print("QYWX_AM 设置错误!!\n取消推送") + return + corpid = QYWX_AM_AY[0] + corpsecret = QYWX_AM_AY[1] + touser = QYWX_AM_AY[2] + agentid = QYWX_AM_AY[3] + try: + media_id = QYWX_AM_AY[4] + except: + media_id = '' + wx = WeCom(corpid, corpsecret, agentid) + # 如果没有配置 media_id 默认就以 text 方式发送 + if not media_id: + message = title + '\n\n' + content + response = wx.send_text(message, touser) + else: + response = wx.send_mpnews(title, content, media_id, touser) + if response == 'ok': + print('推送成功!') + else: + print('推送失败!错误信息如下:\n', response) + except Exception as e: + print(e) + + +class WeCom: + def __init__(self, corpid, corpsecret, agentid): + self.CORPID = corpid + self.CORPSECRET = corpsecret + self.AGENTID = agentid + + def get_access_token(self): + url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken' + values = {'corpid': self.CORPID, + 'corpsecret': self.CORPSECRET, + } + req = requests.post(url, params=values) + data = json.loads(req.text) + return data["access_token"] + + def send_text(self, message, touser="@all"): + send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token() + send_values = { + "touser": touser, + "msgtype": "text", + "agentid": self.AGENTID, + "text": { + "content": message + }, + "safe": "0" + } + send_msges = (bytes(json.dumps(send_values), 'utf-8')) + respone = requests.post(send_url, send_msges) + respone = respone.json() + return respone["errmsg"] + + def send_mpnews(self, title, message, media_id, touser="@all"): + send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + self.get_access_token() + send_values = { + "touser": touser, + "msgtype": "mpnews", + "agentid": self.AGENTID, + "mpnews": { + "articles": [ + { + "title": title, + "thumb_media_id": media_id, + "author": "Author", + "content_source_url": "", + "content": message.replace('\n', '
'), + "digest": message + } + ] + } + } + send_msges = (bytes(json.dumps(send_values), 'utf-8')) + respone = requests.post(send_url, send_msges) + respone = respone.json() + return respone["errmsg"] + + +def send(title, content): + """ + 使用 bark, telegram bot, dingding bot, serverJ 发送手机推送 + :param title: + :param content: + :return: + """ + content += '\n\n\n开源免费By: https://gitee.com/zhangluo0104/check.git' + for i in notify_mode: + if i == 'bark': + if BARK: + bark(title=title, content=content) + else: + print('未启用 bark') + continue + if i == 'sc_key': + if SCKEY: + serverJ(title=title, content=content) + else: + print('未启用 Server酱') + continue + elif i == 'go_cqhttp': + if GOBOT_URL and GOBOT_QQ: + go_cqhttp(title=title, content=content) + else: + print('未启用 go-cqhttp') + continue + elif i == 'dingding_bot': + if DD_BOT_ACCESS_TOKEN and DD_BOT_SECRET: + dingding_bot(title=title, content=content) + else: + print('未启用 钉钉机器人') + continue + elif i == 'telegram_bot': + if TG_BOT_TOKEN and TG_USER_ID: + telegram_bot(title=title, content=content) + else: + print('未启用 telegram机器人') + continue + elif i == 'coolpush_bot': + if QQ_SKEY and QQ_MODE: + coolpush_bot(title=title, content=content) + else: + print('未启用 QQ机器人') + continue + elif i == 'pushplus_bot': + if PUSH_PLUS_TOKEN: + pushplus_bot(title=title, content=content) + else: + print('未启用 PUSHPLUS机器人') + continue + elif i == 'wecom_app': + if QYWX_AM: + wecom_app(title=title, content=content) + else: + print('未启用企业微信应用消息推送') + continue + else: + print('此类推送方式不存在') + + +def main(): + send('title', 'content') + + +if __name__ == '__main__': + main() diff --git a/wzyd.py b/wzyd.py index 01012e1..b821107 100644 --- a/wzyd.py +++ b/wzyd.py @@ -41,5 +41,6 @@ if __name__ == "__main__": print('王者营地签到开始') text = WZYDCheckIn(check_item=os.environ.get('wzyd')).main() localtime = time.asctime(time.localtime(time.time())) - send(f'当前时间{localtime}\n结果:{text}') + result=f'当前时间{localtime}\n结果:{text}' + send('王者营地签到',result) else:print('未找到变量请填入') \ No newline at end of file