Browse Source

update

master
lework 5 years ago
parent
commit
9f62189ced
  1. 45
      python/supervisor_exporter.py
  2. 197
      python/supervisor_healthCheck.py

45
python/supervisor_exporter.py

@ -16,16 +16,30 @@ @@ -16,16 +16,30 @@
# stdout_logfile_backups=3
# buffer_size=10
from xmlrpclib import ServerProxy
from prometheus_client import Gauge, Counter, CollectorRegistry ,generate_latest, start_http_server
import sys
from time import sleep
from supervisor.xmlrpc import SupervisorTransport
from prometheus_client import Gauge, Counter, CollectorRegistry ,generate_latest, start_http_server
try:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
except ImportError:
# Python 3
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
if PY3:
from xmlrpc.client import Transport, ServerProxy, Fault
from http.server import BaseHTTPRequestHandler, HTTPServer
else:
from xmlrpclib import Transport, ServerProxy, Fault
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
def get_supervisord_conn(supervisord_url, supervisord_user, supervisord_pass):
"""
获取supervisor的连接
:return:
"""
transport = SupervisorTransport(supervisord_user, supervisord_pass, supervisord_url)
s = ServerProxy('http://127.0.0.1', transport=transport)
return s
def is_runing(state):
state_info = {
@ -47,7 +61,7 @@ def get_metrics(): @@ -47,7 +61,7 @@ def get_metrics():
collect_reg = CollectorRegistry(auto_describe=True)
try:
s = ServerProxy(supervisord_url)
s = get_supervisord_conn(supervisord_url, supervisord_user, supervisord_pass)
data = s.supervisor.getAllProcessInfo()
except Exception as e:
print("unable to call supervisord: %s" % e)
@ -96,29 +110,32 @@ class myHandler(BaseHTTPRequestHandler): @@ -96,29 +110,32 @@ class myHandler(BaseHTTPRequestHandler):
self.end_headers()
data=""
if self.path=="/":
data="hello, supervistor."
data=b"hello, supervistor.\r\n\r\n/metrics"
elif self.path=="/metrics":
data=generate_latest(get_metrics())
else:
data="not found"
data=b"not found"
# Send the html message
self.wfile.write(data)
return
if __name__ == '__main__':
try:
supervisord_url = "http://localhost:9001/RPC2"
supervisord_url = "unix:///var/run/supervisor.sock"
supervisord_user = ""
supervisord_pass = ""
PORT_NUMBER = 8000
PORT_NUMBER = 8081
#Create a web server and define the handler to manage the
#incoming request
server = HTTPServer(('', PORT_NUMBER), myHandler)
print 'Started httpserver on port ' , PORT_NUMBER
print('Started httpserver on port',PORT_NUMBER)
#Wait forever for incoming htto requests
server.serve_forever()
except KeyboardInterrupt:
print '^C received, shutting down the web server'
print('^C received, shutting down the web server')
server.socket.close()

197
python/supervisor_healthCheck.py

@ -1,7 +1,7 @@ @@ -1,7 +1,7 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time : 2019-11-22
# @Time : 2019-11-25
# @Author : lework
# @Desc : 针对supervisor的应用进行健康检查
@ -13,6 +13,7 @@ import json @@ -13,6 +13,7 @@ import json
import yaml
import base64
import socket
import signal
import smtplib
import datetime
import platform
@ -21,51 +22,34 @@ import subprocess @@ -21,51 +22,34 @@ import subprocess
from email.header import Header
from email.mime.text import MIMEText
from collections import namedtuple
from supervisor.xmlrpc import SupervisorTransport
PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
if PY3:
import http.client as httplib
from xmlrpc.client import Transport, ServerProxy, Fault
def iterkeys(d, **kw):
return iter(d.keys(**kw))
def iteritems(d, **kw):
return iter(d.items(**kw))
else:
import httplib
from xmlrpclib import Transport, ServerProxy, Fault
def iterkeys(d, **kw):
return d.iterkeys(**kw)
def iteritems(d, **kw):
return d.iteritems(**kw)
class UnixStreamHTTPConnection(httplib.HTTPConnection):
"""
使用socket连接
"""
def connect(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.host)
class UnixStreamTransport(Transport, object):
"""
自定义xmlrpclb的Transport,以便使用socket文件连接通信
"""
def __init__(self, socket_path):
self.socket_path = socket_path
super(UnixStreamTransport, self).__init__()
def make_connection(self, host):
return UnixStreamHTTPConnection(self.socket_path)
def shell(cmd):
"""
执行系统命令
@ -171,12 +155,14 @@ class HealthCheck(object): @@ -171,12 +155,14 @@ class HealthCheck(object):
self.mail_config = None
self.wechat_config = None
self.supervisor_url = '/var/run/supervisor.sock'
self.supervisord_url = 'unix:///var/run/supervisor.sock'
if 'config' in config:
self.mail_config = config['config'].get('mail')
self.wechat_config = config['config'].get('wechat')
self.supervisor_url = config['config'].get('supervistorUrl', self.supervisor_url)
self.supervisord_url = config['config'].get('supervisordUrl', self.supervisord_url)
self.supervisord_user = config['config'].get('supervisordUser', None)
self.supervisord_pass = config['config'].get('supervisordPass', None)
config.pop('config')
self.program_config = config
@ -185,20 +171,20 @@ class HealthCheck(object): @@ -185,20 +171,20 @@ class HealthCheck(object):
self.failureThreshold = 3
self.successThreshold = 1
self.initialDelaySeconds = 1
self.sendResolved = False
self.max_rss = 1024
self.cumulative = False
self.max_cpu = 90
def get_supervisor_conn(self):
def get_supervisord_conn(self):
"""
获取supervisor的连接
:return:
"""
if 'RPC2' == self.supervisor_url[-4:]:
s = ServerProxy(self.supervisor_url)
else:
s = ServerProxy('http://', transport=UnixStreamTransport(self.supervisor_url))
transport = SupervisorTransport(self.supervisord_user, self.supervisord_pass, self.supervisord_url)
s = ServerProxy('http://127.0.0.1', transport=transport)
return s
def get_pid(self, program, kind, pid_file):
@ -206,7 +192,7 @@ class HealthCheck(object): @@ -206,7 +192,7 @@ class HealthCheck(object):
获取进程pid
:param program:
:param kind:
:param args:
:param pid_file:
:return:
"""
pid = 0
@ -214,7 +200,7 @@ class HealthCheck(object): @@ -214,7 +200,7 @@ class HealthCheck(object):
if kind == 'supervisor':
try:
s = self.get_supervisor_conn()
s = self.get_supervisord_conn()
info = s.supervisor.getProcessInfo(program)
pid = info.get('pid')
err = info.get('description')
@ -249,7 +235,10 @@ class HealthCheck(object): @@ -249,7 +235,10 @@ class HealthCheck(object):
def log(self, program, msg, *args):
"""
写信息到 STDERR.
:param str msg: string message.
:param program:
:param msg:
:param args:
:return:
"""
curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@ -271,6 +260,7 @@ class HealthCheck(object): @@ -271,6 +260,7 @@ class HealthCheck(object):
failureThreshold = config.get('failureThreshold', self.failureThreshold)
successThreshold = config.get('successThreshold', self.successThreshold)
initialDelaySeconds = config.get('initialDelaySeconds', self.initialDelaySeconds)
sendResolved = config.get('sendResolved', self.sendResolved)
action_type = config.get('action', 'restart')
action_exec_cmd = config.get('execCmd')
@ -298,7 +288,7 @@ class HealthCheck(object): @@ -298,7 +288,7 @@ class HealthCheck(object):
# self.log(program, '%s check state: %s', check_type, json.dumps(check_state[program]))
if check_state[program]['periodSeconds'] % periodSeconds == 0:
check_result = check_method(config)
check_status = check_result.get('status', 'unknow')
check_status = check_result.get('status', None)
check_info = check_result.get('info', '')
self.log(program, '%s check: info(%s) state(%s)', check_type.upper(), check_info, check_status)
@ -309,6 +299,18 @@ class HealthCheck(object): @@ -309,6 +299,18 @@ class HealthCheck(object):
# 先判断成功次数
if check_state[program]['success'] >= successThreshold:
if sendResolved and check_state[program]['failure'] > 0:
# 只保留通知action
notice_action = ['email', 'wechat']
send_action = ','.join(list(set(action_type.split(',')) & set(notice_action)))
self.log(program, 'Use %s send resolved.', send_action)
action_param = {
'check_status': check_status,
'action_type': send_action,
'msg': check_result.get('msg', '')
}
self.action(program, **action_param)
# 成功后,将项目状态初始化
check_state[program]['failure'] = 0
check_state[program]['success'] = 0
@ -316,13 +318,14 @@ class HealthCheck(object): @@ -316,13 +318,14 @@ class HealthCheck(object):
# 再判断失败次数
if check_state[program]['failure'] >= failureThreshold:
# 失败后, 只触发一次action,或者检测错误数是2倍periodSeconds的平方数时触发(避免重启失败导致服务一直不可用)
# 失败后, 只触发一次action,或者检测错误数可以整除2倍periodSeconds与initialDelaySeconds时触发(避免重启失败导致服务一直不可用)
if not check_state[program]['action'] or (
check_state[program]['failure'] != 0 and check_state[program]['failure'] % (
periodSeconds * 2) == 0):
(periodSeconds + initialDelaySeconds) * 2) == 0):
action_param = {
'action_type': action_type,
'check_result': check_result.get('msg', ''),
'check_status': check_status,
'msg': check_result.get('msg', ''),
'action_exec_cmd': action_exec_cmd
}
self.action(program, **action_param)
@ -392,7 +395,7 @@ class HealthCheck(object): @@ -392,7 +395,7 @@ class HealthCheck(object):
if res.status != httplib.OK:
return {'status': 'failure', 'msg': '[http_check] return code %s' % res.status, 'info': check_info}
return {'status': 'success', 'info': check_info}
return {'status': 'success', 'msg': '[http_check] return code %s' % res.status, 'info': check_info}
def tcp_check(self, config):
"""
@ -413,7 +416,7 @@ class HealthCheck(object): @@ -413,7 +416,7 @@ class HealthCheck(object):
except Exception as e:
self.log(program, 'TCP: conn error, %s', e)
return {'status': 'failure', 'msg': '[tcp_check] %s' % e, 'info': check_info}
return {'status': 'success', 'info': check_info}
return {'status': 'success', 'msg': '[tcp_check] connection succeeded', 'info': check_info}
def mem_check(self, config):
"""
@ -440,7 +443,8 @@ class HealthCheck(object): @@ -440,7 +443,8 @@ class HealthCheck(object):
return {'status': 'failure', 'msg': '[mem_check] max_rss(%sMB) now_rss(%sMB)' % (max_rss, now_rss),
'info': check_info}
return {'status': 'success', 'info': check_info}
return {'status': 'success', 'msg': '[mem_check] max_rss(%sMB) now_rss(%sMB)' % (max_rss, now_rss),
'info': check_info}
def cpu_check(self, config):
"""
@ -467,34 +471,36 @@ class HealthCheck(object): @@ -467,34 +471,36 @@ class HealthCheck(object):
'msg': '[cpu_check] max_cpu({max_cpu}%) now_cpu({now}%)'.format(max_cpu=max_cpu, now=now_cpu),
'info': check_info}
return {'status': 'success', 'info': check_info}
return {'status': 'success',
'msg': '[cpu_check] max_cpu({max_cpu}%) now_cpu({now}%)'.format(max_cpu=max_cpu, now=now_cpu),
'info': check_info}
def action(self, program, **args):
"""
执行动作
:param program:
:param action_type:
:param error:
:return:
:param args:
:return: None
"""
action_type = args.get('action_type')
check_result = args.get('check_result')
msg = args.get('msg')
action_exec_cmd = args.get('action_exec_cmd')
check_status = args.get('check_status')
self.log(program, 'Action: %s', action_type)
action_list = action_type.split(',')
if 'restart' in action_list:
restart_result = self.action_supervistor_restart(program)
check_result += '\r\n Restart:%s' % restart_result
msg += '\r\n Restart:%s' % restart_result
elif 'exec' in action_list:
exec_result = self.action_exec(program, action_exec_cmd)
check_result += '\r\n Exec:%s' % exec_result
msg += '\r\n Exec:%s' % exec_result
if 'email' in action_list and self.mail_config:
self.action_email(program, action_type, check_result)
self.action_email(program, action_type, msg, check_status)
if 'wechat' in action_list and self.wechat_config:
self.action_wechat(program, action_type, check_result)
self.action_wechat(program, action_type, msg, check_status)
def action_supervistor_restart(self, program):
"""
@ -505,7 +511,7 @@ class HealthCheck(object): @@ -505,7 +511,7 @@ class HealthCheck(object):
self.log(program, 'Action: restart')
result = 'success'
try:
s = self.get_supervisor_conn()
s = self.get_supervisord_conn()
info = s.supervisor.getProcessInfo(program)
except Exception as e:
result = 'Get %s ProcessInfo Error: %s' % (program, e)
@ -541,7 +547,7 @@ class HealthCheck(object): @@ -541,7 +547,7 @@ class HealthCheck(object):
"""
执行系统命令
:param program:
:param exec:
:param cmd:
:return:
"""
self.log(program, 'Action: exec')
@ -557,27 +563,34 @@ class HealthCheck(object): @@ -557,27 +563,34 @@ class HealthCheck(object):
return result
def action_email(self, program, action_type, msg):
def action_email(self, program, action_type, msg, check_status):
"""
发送email
:param subject: str
:param content: str
:return: bool
:param program:
:param action_type:
:param msg:
:param check_status:
:return:
"""
self.log(program, 'Action: email')
ip = ""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
except Exception as e:
self.log(program, 'Action: email get ip error %s' % e)
finally:
s.close()
hostname = platform.node().split('.')[0]
system_platform = platform.platform()
subject = "[Supervisor] %s health check Faild" % program
if check_status == 'success':
subject = "[Supervisor] %s Health check successful" % program
else:
subject = "[Supervisor] %s Health check failed" % program
curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
content = """
DateTime: {curr_dt}
@ -588,8 +601,7 @@ class HealthCheck(object): @@ -588,8 +601,7 @@ class HealthCheck(object):
Action: {action}
Msg: {msg}
""".format(curr_dt=curr_dt, program=program, ip=ip, hostname=hostname, system_platform=system_platform,
action=action_type,
msg=msg)
action=action_type, msg=msg)
mail_port = self.mail_config.get('port', '')
mail_host = self.mail_config.get('host', '')
mail_user = self.mail_config.get('user', '')
@ -612,12 +624,13 @@ class HealthCheck(object): @@ -612,12 +624,13 @@ class HealthCheck(object):
self.log(program, 'Action: email send success.')
return True
def action_wechat(self, program, action_type, msg):
def action_wechat(self, program, action_type, msg, check_status):
"""
微信通知
:param program:
:param action_type:
:param msg:
:param check_status:
:return:
"""
self.log(program, 'Action: wechat')
@ -651,10 +664,12 @@ class HealthCheck(object): @@ -651,10 +664,12 @@ class HealthCheck(object):
send_url = '/cgi-bin/message/send?access_token={token}'.format(token=token)
ip = ""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
except Exception as e:
self.log(program, 'Action: wechat get ip error %s' % e)
finally:
s.close()
@ -662,17 +677,22 @@ class HealthCheck(object): @@ -662,17 +677,22 @@ class HealthCheck(object):
system_platform = platform.platform()
curr_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
title = "<font color=\"warning\">[Supervisor] %s health check Faild</font>" % program
content = title \
+ "\n> **详情信息**" \
+ "\n> DataTime: " + curr_dt \
+ "\n> Program: <font color=\"warning\">%s</font>" % program \
+ "\n> IP: " + ip \
+ "\n> Hostname: " + hostname \
+ "\n> Platfrom: " + system_platform \
+ "\n> Action: " + action_type \
+ "\n> Msg: " + str(msg)
if check_status == 'success':
title = "<font color=\"info\">[Supervisor] %s Health check successful</font>" % program
else:
title = "<font color=\"warning\">[Supervisor] %s Health check failed</font>" % program
content = "{title}\n \
> **详情信息**\n \
> DataTime: {curr_dt}\n \
> Program: <font color=\"warning\">{program}</font>\n \
> IP: {ip}\n \
> Hostname: {hostname}\n \
> Platfrom: {platfrom}\n \
> Action: {action}\n \
> Msg: {msg}".format(title=title, curr_dt=curr_dt, program=program, ip=ip, hostname=hostname,
platfrom=system_platform, action=action_type, msg=msg)
data = {
"msgtype": 'markdown',
@ -712,9 +732,6 @@ class HealthCheck(object): @@ -712,9 +732,6 @@ class HealthCheck(object):
:return:
"""
self.log('healthCheck:', 'start')
s = self.get_supervisor_conn()
info = s.supervisor.getProcessInfo('api-paibloks-show')
print(info)
threads = []
for key, value in iteritems(self.program_config):
@ -723,18 +740,32 @@ class HealthCheck(object): @@ -723,18 +740,32 @@ class HealthCheck(object):
t = threading.Thread(target=self.check, args=(item,))
threads.append(t)
for t in threads:
t.setDaemon(True)
t.start()
for t in threads:
t.join()
while True:
pass
if __name__ == '__main__':
# 信号处理
def sig_handler(signum, frame):
print("Exit check!")
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGQUIT, sig_handler)
# 获取当前目录下的配置文件,没有的话就生成个模板
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.yaml')
if not os.path.exists(config_file):
example_config = """
config: # 脚本配置名称,请勿更改
# supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socker文件/var/run/supervisor.sock
# supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socket文件unix:///var/run/supervisor.sock
# supervisordUser: user # supervisor中设置的username, 没有设置可不填
# supervisordPass: pass # supervisor中设置的password, 没有设置可不填
# mail: # stmp配置
# host: 'smtp.test.com'
# port': '465'
@ -762,6 +793,7 @@ cat1: # supervisor中配置的program名称 @@ -762,6 +793,7 @@ cat1: # supervisor中配置的program名称
successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1
action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart
execCmd: command # action exec 的执行命令
sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False
# cpu方式监控
cat2: # supervisor中配置的program名称
@ -775,6 +807,7 @@ cat2: # supervisor中配置的program名称 @@ -775,6 +807,7 @@ cat2: # supervisor中配置的program名称
successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1
action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart
execCmd: command # action exec 的执行命令
sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False
# HTTP方式监控
cat3:
@ -794,6 +827,7 @@ cat3: @@ -794,6 +827,7 @@ cat3:
successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1
action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart
execCmd: command # action exec 的执行命令
sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False
# TCP方式监控
cat4:
@ -807,6 +841,7 @@ cat4: @@ -807,6 +841,7 @@ cat4:
successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1
action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart
execCmd: command # action exec 的执行命令
sendResolved: True # 是否发送恢复通知,仅用作于email,wechat. 默认: False
"""
with open(config_file, 'w') as f:
f.write(example_config)
@ -816,11 +851,7 @@ cat4: @@ -816,11 +851,7 @@ cat4:
sys.exit(0)
with open(config_file) as f:
config = yaml.load(f)
config = yaml.safe_load(f)
check = HealthCheck(config)
print(dir(check))
# s = check.get_supervisor_conn()
# info = s.supervisor.getProcessInfo('api-paibloks-show')
# print(info)
check.start()

Loading…
Cancel
Save