From 9f62189cedbda5066f510ac1c071a27a3324442f Mon Sep 17 00:00:00 2001 From: lework Date: Tue, 26 Nov 2019 16:41:44 +0800 Subject: [PATCH] update --- python/supervisor_exporter.py | 67 ++++++---- python/supervisor_healthCheck.py | 219 ++++++++++++++++++------------- 2 files changed, 167 insertions(+), 119 deletions(-) diff --git a/python/supervisor_exporter.py b/python/supervisor_exporter.py index 89db058..1266d83 100644 --- a/python/supervisor_exporter.py +++ b/python/supervisor_exporter.py @@ -16,27 +16,41 @@ # stdout_logfile_backups=3 # buffer_size=10 - -from xmlrpclib import ServerProxy -from prometheus_client import Gauge, Counter, CollectorRegistry ,generate_latest, start_http_server +import sys from time import sleep +from supervisor.xmlrpc import SupervisorTransport +from prometheus_client import Gauge, Counter, CollectorRegistry ,generate_latest, start_http_server -try: - from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer -except ImportError: - # Python 3 +PY2 = sys.version_info[0] == 2 +PY3 = sys.version_info[0] == 3 + +if PY3: + from xmlrpc.client import Transport, ServerProxy, Fault from http.server import BaseHTTPRequestHandler, HTTPServer +else: + from xmlrpclib import Transport, ServerProxy, Fault + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer + + +def get_supervisord_conn(supervisord_url, supervisord_user, supervisord_pass): + """ + 获取supervisor的连接 + :return: + """ + transport = SupervisorTransport(supervisord_user, supervisord_pass, supervisord_url) + s = ServerProxy('http://127.0.0.1', transport=transport) + return s def is_runing(state): state_info = { - # 'STOPPED': 0, - 'STARTING': 10, - 'RUNNING': 20 - # 'BACKOFF': 30, - # 'STOPPING': 40 - # 'EXITED': 100, - # 'FATAL': 200, - # 'UNKNOWN': 1000 + # 'STOPPED': 0, + 'STARTING': 10, + 'RUNNING': 20 + # 'BACKOFF': 30, + # 'STOPPING': 40 + # 'EXITED': 100, + # 'FATAL': 200, + # 'UNKNOWN': 1000 } if state in state_info.values(): return True @@ -47,7 +61,7 @@ def get_metrics(): collect_reg = CollectorRegistry(auto_describe=True) try: - s = ServerProxy(supervisord_url) + s = get_supervisord_conn(supervisord_url, supervisord_user, supervisord_pass) data = s.supervisor.getAllProcessInfo() except Exception as e: print("unable to call supervisord: %s" % e) @@ -82,11 +96,11 @@ def get_metrics(): if is_runing(state): metric_up.labels(*labels).set(1) - metric_start_time_seconds.labels(*labels).inc(start) + metric_start_time_seconds.labels(*labels).inc(start) else: metric_up.labels(*labels).set(0) - return collect_reg + return collect_reg class myHandler(BaseHTTPRequestHandler): @@ -96,29 +110,32 @@ class myHandler(BaseHTTPRequestHandler): self.end_headers() data="" if self.path=="/": - data="hello, supervistor." - elif self.path=="/metrics": + data=b"hello, supervistor.\r\n\r\n/metrics" + elif self.path=="/metrics": data=generate_latest(get_metrics()) else: - data="not found" + data=b"not found" # Send the html message self.wfile.write(data) + return if __name__ == '__main__': try: - supervisord_url = "http://localhost:9001/RPC2" + supervisord_url = "unix:///var/run/supervisor.sock" + supervisord_user = "" + supervisord_pass = "" - PORT_NUMBER = 8000 + PORT_NUMBER = 8081 #Create a web server and define the handler to manage the #incoming request server = HTTPServer(('', PORT_NUMBER), myHandler) - print 'Started httpserver on port ' , PORT_NUMBER + print('Started httpserver on port',PORT_NUMBER) #Wait forever for incoming htto requests server.serve_forever() except KeyboardInterrupt: - print '^C received, shutting down the web server' + print('^C received, shutting down the web server') server.socket.close() diff --git a/python/supervisor_healthCheck.py b/python/supervisor_healthCheck.py index e8e649f..34c3b70 100644 --- a/python/supervisor_healthCheck.py +++ b/python/supervisor_healthCheck.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# @Time : 2019-11-22 +# @Time : 2019-11-25 # @Author : lework # @Desc : 针对supervisor的应用进行健康检查 @@ -13,6 +13,7 @@ import json import yaml import base64 import socket +import signal import smtplib import datetime import platform @@ -21,51 +22,34 @@ import subprocess from email.header import Header from email.mime.text import MIMEText from collections import namedtuple +from supervisor.xmlrpc import SupervisorTransport -PY2 = sys.version_info[0] == 2 PY3 = sys.version_info[0] == 3 if PY3: import http.client as httplib from xmlrpc.client import Transport, ServerProxy, Fault + def iterkeys(d, **kw): return iter(d.keys(**kw)) + def iteritems(d, **kw): return iter(d.items(**kw)) else: import httplib from xmlrpclib import Transport, ServerProxy, Fault + def iterkeys(d, **kw): return d.iterkeys(**kw) + def iteritems(d, **kw): return d.iteritems(**kw) -class UnixStreamHTTPConnection(httplib.HTTPConnection): - """ - 使用socket连接 - """ - def connect(self): - self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.sock.connect(self.host) - - -class UnixStreamTransport(Transport, object): - """ - 自定义xmlrpclb的Transport,以便使用socket文件连接通信 - """ - def __init__(self, socket_path): - self.socket_path = socket_path - super(UnixStreamTransport, self).__init__() - - def make_connection(self, host): - return UnixStreamHTTPConnection(self.socket_path) - - def shell(cmd): """ 执行系统命令 @@ -171,12 +155,14 @@ class HealthCheck(object): self.mail_config = None self.wechat_config = None - self.supervisor_url = '/var/run/supervisor.sock' + self.supervisord_url = 'unix:///var/run/supervisor.sock' if 'config' in config: self.mail_config = config['config'].get('mail') self.wechat_config = config['config'].get('wechat') - self.supervisor_url = config['config'].get('supervistorUrl', self.supervisor_url) + self.supervisord_url = config['config'].get('supervisordUrl', self.supervisord_url) + self.supervisord_user = config['config'].get('supervisordUser', None) + self.supervisord_pass = config['config'].get('supervisordPass', None) config.pop('config') self.program_config = config @@ -185,20 +171,20 @@ class HealthCheck(object): self.failureThreshold = 3 self.successThreshold = 1 self.initialDelaySeconds = 1 + self.sendResolved = False self.max_rss = 1024 self.cumulative = False self.max_cpu = 90 - def get_supervisor_conn(self): + def get_supervisord_conn(self): """ 获取supervisor的连接 :return: """ - if 'RPC2' == self.supervisor_url[-4:]: - s = ServerProxy(self.supervisor_url) - else: - s = ServerProxy('http://', transport=UnixStreamTransport(self.supervisor_url)) + transport = SupervisorTransport(self.supervisord_user, self.supervisord_pass, self.supervisord_url) + s = ServerProxy('http://127.0.0.1', transport=transport) + return s def get_pid(self, program, kind, pid_file): @@ -206,7 +192,7 @@ class HealthCheck(object): 获取进程pid :param program: :param kind: - :param args: + :param pid_file: :return: """ pid = 0 @@ -214,7 +200,7 @@ class HealthCheck(object): if kind == 'supervisor': try: - s = self.get_supervisor_conn() + s = self.get_supervisord_conn() info = s.supervisor.getProcessInfo(program) pid = info.get('pid') err = info.get('description') @@ -249,7 +235,10 @@ class HealthCheck(object): def log(self, program, msg, *args): """ 写信息到 STDERR. - :param str msg: string message. + :param program: + :param msg: + :param args: + :return: """ curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') @@ -271,6 +260,7 @@ class HealthCheck(object): failureThreshold = config.get('failureThreshold', self.failureThreshold) successThreshold = config.get('successThreshold', self.successThreshold) initialDelaySeconds = config.get('initialDelaySeconds', self.initialDelaySeconds) + sendResolved = config.get('sendResolved', self.sendResolved) action_type = config.get('action', 'restart') action_exec_cmd = config.get('execCmd') @@ -298,7 +288,7 @@ class HealthCheck(object): # self.log(program, '%s check state: %s', check_type, json.dumps(check_state[program])) if check_state[program]['periodSeconds'] % periodSeconds == 0: check_result = check_method(config) - check_status = check_result.get('status', 'unknow') + check_status = check_result.get('status', None) check_info = check_result.get('info', '') self.log(program, '%s check: info(%s) state(%s)', check_type.upper(), check_info, check_status) @@ -309,6 +299,18 @@ class HealthCheck(object): # 先判断成功次数 if check_state[program]['success'] >= successThreshold: + if sendResolved and check_state[program]['failure'] > 0: + # 只保留通知action + notice_action = ['email', 'wechat'] + send_action = ','.join(list(set(action_type.split(',')) & set(notice_action))) + self.log(program, 'Use %s send resolved.', send_action) + action_param = { + 'check_status': check_status, + 'action_type': send_action, + 'msg': check_result.get('msg', '') + } + self.action(program, **action_param) + # 成功后,将项目状态初始化 check_state[program]['failure'] = 0 check_state[program]['success'] = 0 @@ -316,13 +318,14 @@ class HealthCheck(object): # 再判断失败次数 if check_state[program]['failure'] >= failureThreshold: - # 失败后, 只触发一次action,或者检测错误数是2倍periodSeconds的平方数时触发(避免重启失败导致服务一直不可用) + # 失败后, 只触发一次action,或者检测错误数可以整除2倍periodSeconds与initialDelaySeconds时触发(避免重启失败导致服务一直不可用) if not check_state[program]['action'] or ( check_state[program]['failure'] != 0 and check_state[program]['failure'] % ( - periodSeconds * 2) == 0): + (periodSeconds + initialDelaySeconds) * 2) == 0): action_param = { 'action_type': action_type, - 'check_result': check_result.get('msg', ''), + 'check_status': check_status, + 'msg': check_result.get('msg', ''), 'action_exec_cmd': action_exec_cmd } self.action(program, **action_param) @@ -392,7 +395,7 @@ class HealthCheck(object): if res.status != httplib.OK: return {'status': 'failure', 'msg': '[http_check] return code %s' % res.status, 'info': check_info} - return {'status': 'success', 'info': check_info} + return {'status': 'success', 'msg': '[http_check] return code %s' % res.status, 'info': check_info} def tcp_check(self, config): """ @@ -413,7 +416,7 @@ class HealthCheck(object): except Exception as e: self.log(program, 'TCP: conn error, %s', e) return {'status': 'failure', 'msg': '[tcp_check] %s' % e, 'info': check_info} - return {'status': 'success', 'info': check_info} + return {'status': 'success', 'msg': '[tcp_check] connection succeeded', 'info': check_info} def mem_check(self, config): """ @@ -440,7 +443,8 @@ class HealthCheck(object): return {'status': 'failure', 'msg': '[mem_check] max_rss(%sMB) now_rss(%sMB)' % (max_rss, now_rss), 'info': check_info} - return {'status': 'success', 'info': check_info} + return {'status': 'success', 'msg': '[mem_check] max_rss(%sMB) now_rss(%sMB)' % (max_rss, now_rss), + 'info': check_info} def cpu_check(self, config): """ @@ -467,34 +471,36 @@ class HealthCheck(object): 'msg': '[cpu_check] max_cpu({max_cpu}%) now_cpu({now}%)'.format(max_cpu=max_cpu, now=now_cpu), 'info': check_info} - return {'status': 'success', 'info': check_info} + return {'status': 'success', + 'msg': '[cpu_check] max_cpu({max_cpu}%) now_cpu({now}%)'.format(max_cpu=max_cpu, now=now_cpu), + 'info': check_info} def action(self, program, **args): """ 执行动作 :param program: - :param action_type: - :param error: - :return: + :param args: + :return: None """ action_type = args.get('action_type') - check_result = args.get('check_result') + msg = args.get('msg') action_exec_cmd = args.get('action_exec_cmd') + check_status = args.get('check_status') self.log(program, 'Action: %s', action_type) action_list = action_type.split(',') if 'restart' in action_list: restart_result = self.action_supervistor_restart(program) - check_result += '\r\n Restart:%s' % restart_result + msg += '\r\n Restart:%s' % restart_result elif 'exec' in action_list: exec_result = self.action_exec(program, action_exec_cmd) - check_result += '\r\n Exec:%s' % exec_result + msg += '\r\n Exec:%s' % exec_result if 'email' in action_list and self.mail_config: - self.action_email(program, action_type, check_result) + self.action_email(program, action_type, msg, check_status) if 'wechat' in action_list and self.wechat_config: - self.action_wechat(program, action_type, check_result) + self.action_wechat(program, action_type, msg, check_status) def action_supervistor_restart(self, program): """ @@ -505,7 +511,7 @@ class HealthCheck(object): self.log(program, 'Action: restart') result = 'success' try: - s = self.get_supervisor_conn() + s = self.get_supervisord_conn() info = s.supervisor.getProcessInfo(program) except Exception as e: result = 'Get %s ProcessInfo Error: %s' % (program, e) @@ -541,7 +547,7 @@ class HealthCheck(object): """ 执行系统命令 :param program: - :param exec: + :param cmd: :return: """ self.log(program, 'Action: exec') @@ -557,39 +563,45 @@ class HealthCheck(object): return result - def action_email(self, program, action_type, msg): + def action_email(self, program, action_type, msg, check_status): """ 发送email - :param subject: str - :param content: str - :return: bool + :param program: + :param action_type: + :param msg: + :param check_status: + :return: """ self.log(program, 'Action: email') ip = "" + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] + except Exception as e: + self.log(program, 'Action: email get ip error %s' % e) finally: s.close() hostname = platform.node().split('.')[0] system_platform = platform.platform() - subject = "[Supervisor] %s health check Faild" % program + if check_status == 'success': + subject = "[Supervisor] %s Health check successful" % program + else: + subject = "[Supervisor] %s Health check failed" % program curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') content = """ - DateTime: {curr_dt} - Program: {program} - IP: {ip} - Hostname: {hostname} - Platfrom: {system_platform} - Action: {action} - Msg: {msg} + DateTime: {curr_dt} + Program: {program} + IP: {ip} + Hostname: {hostname} + Platfrom: {system_platform} + Action: {action} + Msg: {msg} """.format(curr_dt=curr_dt, program=program, ip=ip, hostname=hostname, system_platform=system_platform, - action=action_type, - msg=msg) + action=action_type, msg=msg) mail_port = self.mail_config.get('port', '') mail_host = self.mail_config.get('host', '') mail_user = self.mail_config.get('user', '') @@ -612,12 +624,13 @@ class HealthCheck(object): self.log(program, 'Action: email send success.') return True - def action_wechat(self, program, action_type, msg): + def action_wechat(self, program, action_type, msg, check_status): """ 微信通知 :param program: :param action_type: :param msg: + :param check_status: :return: """ self.log(program, 'Action: wechat') @@ -651,10 +664,12 @@ class HealthCheck(object): send_url = '/cgi-bin/message/send?access_token={token}'.format(token=token) ip = "" + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] + except Exception as e: + self.log(program, 'Action: wechat get ip error %s' % e) finally: s.close() @@ -662,17 +677,22 @@ class HealthCheck(object): system_platform = platform.platform() curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') - title = "[Supervisor] %s health check Faild" % program - - content = title \ - + "\n> **详情信息**" \ - + "\n> DataTime: " + curr_dt \ - + "\n> Program: %s" % program \ - + "\n> IP: " + ip \ - + "\n> Hostname: " + hostname \ - + "\n> Platfrom: " + system_platform \ - + "\n> Action: " + action_type \ - + "\n> Msg: " + str(msg) + + if check_status == 'success': + title = "[Supervisor] %s Health check successful" % program + else: + title = "[Supervisor] %s Health check failed" % program + + content = "{title}\n \ + > **详情信息**\n \ + > DataTime: {curr_dt}\n \ + > Program: {program}\n \ + > IP: {ip}\n \ + > Hostname: {hostname}\n \ + > Platfrom: {platfrom}\n \ + > Action: {action}\n \ + > Msg: {msg}".format(title=title, curr_dt=curr_dt, program=program, ip=ip, hostname=hostname, + platfrom=system_platform, action=action_type, msg=msg) data = { "msgtype": 'markdown', @@ -712,9 +732,6 @@ class HealthCheck(object): :return: """ self.log('healthCheck:', 'start') - s = self.get_supervisor_conn() - info = s.supervisor.getProcessInfo('api-paibloks-show') - print(info) threads = [] for key, value in iteritems(self.program_config): @@ -723,18 +740,32 @@ class HealthCheck(object): t = threading.Thread(target=self.check, args=(item,)) threads.append(t) for t in threads: + t.setDaemon(True) t.start() - for t in threads: - t.join() + + while True: + pass if __name__ == '__main__': + + # 信号处理 + def sig_handler(signum, frame): + print("Exit check!") + sys.exit(0) + + signal.signal(signal.SIGINT, sig_handler) + signal.signal(signal.SIGTERM, sig_handler) + signal.signal(signal.SIGQUIT, sig_handler) + # 获取当前目录下的配置文件,没有的话就生成个模板 config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.yaml') if not os.path.exists(config_file): example_config = """ config: # 脚本配置名称,请勿更改 -# supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socker文件/var/run/supervisor.sock +# supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socket文件unix:///var/run/supervisor.sock +# supervisordUser: user # supervisor中设置的username, 没有设置可不填 +# supervisordPass: pass # supervisor中设置的password, 没有设置可不填 # mail: # stmp配置 # host: 'smtp.test.com' # port': '465' @@ -761,7 +792,8 @@ cat1: # supervisor中配置的program名称 failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart - execCmd: command # action exec 的执行命令 + execCmd: command # action exec 的执行命令 + sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False # cpu方式监控 cat2: # supervisor中配置的program名称 @@ -774,7 +806,8 @@ cat2: # supervisor中配置的program名称 failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart - execCmd: command # action exec 的执行命令 + execCmd: command # action exec 的执行命令 + sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False # HTTP方式监控 cat3: @@ -793,7 +826,8 @@ cat3: failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart - execCmd: command # action exec 的执行命令 + execCmd: command # action exec 的执行命令 + sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False # TCP方式监控 cat4: @@ -806,7 +840,8 @@ cat4: failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart - execCmd: command # action exec 的执行命令 + execCmd: command # action exec 的执行命令 + sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False """ with open(config_file, 'w') as f: f.write(example_config) @@ -816,11 +851,7 @@ cat4: sys.exit(0) with open(config_file) as f: - config = yaml.load(f) + config = yaml.safe_load(f) check = HealthCheck(config) - print(dir(check)) - # s = check.get_supervisor_conn() - # info = s.supervisor.getProcessInfo('api-paibloks-show') - # print(info) check.start()