Browse Source

feat: add conn supervistor socket file

master
lework 5 years ago
parent
commit
481e7b236b
  1. 66
      python/supervisor_healthCheck.py

66
python/supervisor_healthCheck.py

@ -1,7 +1,7 @@
#!/usr/bin/python #!/usr/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Time : 2019-11-07 # @Time : 2019-11-22
# @Author : lework # @Author : lework
# @Desc : 针对supervisor的应用进行健康检查 # @Desc : 针对supervisor的应用进行健康检查
@ -18,35 +18,54 @@ import datetime
import platform import platform
import threading import threading
import subprocess import subprocess
from xmlrpclib import ServerProxy, Fault
from email.header import Header from email.header import Header
from email.mime.text import MIMEText from email.mime.text import MIMEText
from collections import namedtuple from collections import namedtuple
try:
import httplib
except ImportError:
import http.client as httplib
PY2 = sys.version_info[0] == 2 PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3 PY3 = sys.version_info[0] == 3
if PY3: if PY3:
import http.client as httplib
from xmlrpc.client import Transport, ServerProxy, Fault
def iterkeys(d, **kw): def iterkeys(d, **kw):
return iter(d.keys(**kw)) return iter(d.keys(**kw))
def iteritems(d, **kw): def iteritems(d, **kw):
return iter(d.items(**kw)) return iter(d.items(**kw))
else: else:
import httplib
from xmlrpclib import Transport, ServerProxy, Fault
def iterkeys(d, **kw): def iterkeys(d, **kw):
return d.iterkeys(**kw) return d.iterkeys(**kw)
def iteritems(d, **kw): def iteritems(d, **kw):
return d.iteritems(**kw) return d.iteritems(**kw)
class UnixStreamHTTPConnection(httplib.HTTPConnection):
"""
使用socket连接
"""
def connect(self):
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.sock.connect(self.host)
class UnixStreamTransport(Transport, object):
"""
自定义xmlrpclb的Transport,以便使用socket文件连接通信
"""
def __init__(self, socket_path):
self.socket_path = socket_path
super(UnixStreamTransport, self).__init__()
def make_connection(self, host):
return UnixStreamHTTPConnection(self.socket_path)
def shell(cmd): def shell(cmd):
""" """
执行系统命令 执行系统命令
@ -152,7 +171,7 @@ class HealthCheck(object):
self.mail_config = None self.mail_config = None
self.wechat_config = None self.wechat_config = None
self.supervisor_url = 'http://localhost:9001/RPC2' self.supervisor_url = '/var/run/supervisor.sock'
if 'config' in config: if 'config' in config:
self.mail_config = config['config'].get('mail') self.mail_config = config['config'].get('mail')
@ -171,6 +190,17 @@ class HealthCheck(object):
self.cumulative = False self.cumulative = False
self.max_cpu = 90 self.max_cpu = 90
def get_supervisor_conn(self):
"""
获取supervisor的连接
:return:
"""
if 'RPC2' == self.supervisor_url[-4:]:
s = ServerProxy(self.supervisor_url)
else:
s = ServerProxy('http://', transport=UnixStreamTransport(self.supervisor_url))
return s
def get_pid(self, program, kind, pid_file): def get_pid(self, program, kind, pid_file):
""" """
获取进程pid 获取进程pid
@ -184,7 +214,7 @@ class HealthCheck(object):
if kind == 'supervisor': if kind == 'supervisor':
try: try:
s = ServerProxy(self.supervisor_url) s = self.get_supervisor_conn()
info = s.supervisor.getProcessInfo(program) info = s.supervisor.getProcessInfo(program)
pid = info.get('pid') pid = info.get('pid')
err = info.get('description') err = info.get('description')
@ -288,7 +318,8 @@ class HealthCheck(object):
if check_state[program]['failure'] >= failureThreshold: if check_state[program]['failure'] >= failureThreshold:
# 失败后, 只触发一次action,或者检测错误数是2倍periodSeconds的平方数时触发(避免重启失败导致服务一直不可用) # 失败后, 只触发一次action,或者检测错误数是2倍periodSeconds的平方数时触发(避免重启失败导致服务一直不可用)
if not check_state[program]['action'] or ( if not check_state[program]['action'] or (
check_state[program]['failure'] != 0 and check_state[program]['failure'] % (periodSeconds * 2) == 0): check_state[program]['failure'] != 0 and check_state[program]['failure'] % (
periodSeconds * 2) == 0):
action_param = { action_param = {
'action_type': action_type, 'action_type': action_type,
'check_result': check_result.get('msg', ''), 'check_result': check_result.get('msg', ''),
@ -474,7 +505,7 @@ class HealthCheck(object):
self.log(program, 'Action: restart') self.log(program, 'Action: restart')
result = 'success' result = 'success'
try: try:
s = ServerProxy(self.supervisor_url) s = self.get_supervisor_conn()
info = s.supervisor.getProcessInfo(program) info = s.supervisor.getProcessInfo(program)
except Exception as e: except Exception as e:
result = 'Get %s ProcessInfo Error: %s' % (program, e) result = 'Get %s ProcessInfo Error: %s' % (program, e)
@ -681,6 +712,9 @@ class HealthCheck(object):
:return: :return:
""" """
self.log('healthCheck:', 'start') self.log('healthCheck:', 'start')
s = self.get_supervisor_conn()
info = s.supervisor.getProcessInfo('api-paibloks-show')
print(info)
threads = [] threads = []
for key, value in iteritems(self.program_config): for key, value in iteritems(self.program_config):
@ -700,7 +734,7 @@ if __name__ == '__main__':
if not os.path.exists(config_file): if not os.path.exists(config_file):
example_config = """ example_config = """
config: # 脚本配置名称,请勿更改 config: # 脚本配置名称,请勿更改
supervisordUrl: http://localhost:9001/RPC2 # supervisor的rpc接口地址 # supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socker文件/var/run/supervisor.sock
# mail: # stmp配置 # mail: # stmp配置
# host: 'smtp.test.com' # host: 'smtp.test.com'
# port': '465' # port': '465'
@ -785,4 +819,8 @@ cat4:
config = yaml.load(f) config = yaml.load(f)
check = HealthCheck(config) check = HealthCheck(config)
print(dir(check))
# s = check.get_supervisor_conn()
# info = s.supervisor.getProcessInfo('api-paibloks-show')
# print(info)
check.start() check.start()

Loading…
Cancel
Save