From 481e7b236b7d72a0d2e053d945c910b1d4cd5260 Mon Sep 17 00:00:00 2001 From: lework Date: Fri, 22 Nov 2019 19:18:14 +0800 Subject: [PATCH] feat: add conn supervistor socket file --- python/supervisor_healthCheck.py | 70 ++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/python/supervisor_healthCheck.py b/python/supervisor_healthCheck.py index 09145fd..e8e649f 100644 --- a/python/supervisor_healthCheck.py +++ b/python/supervisor_healthCheck.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -# @Time : 2019-11-07 +# @Time : 2019-11-22 # @Author : lework # @Desc : 针对supervisor的应用进行健康检查 @@ -18,35 +18,54 @@ import datetime import platform import threading import subprocess -from xmlrpclib import ServerProxy, Fault from email.header import Header from email.mime.text import MIMEText from collections import namedtuple -try: - import httplib -except ImportError: - import http.client as httplib - PY2 = sys.version_info[0] == 2 PY3 = sys.version_info[0] == 3 if PY3: + import http.client as httplib + from xmlrpc.client import Transport, ServerProxy, Fault + def iterkeys(d, **kw): return iter(d.keys(**kw)) - def iteritems(d, **kw): return iter(d.items(**kw)) else: + import httplib + from xmlrpclib import Transport, ServerProxy, Fault + def iterkeys(d, **kw): return d.iterkeys(**kw) - def iteritems(d, **kw): return d.iteritems(**kw) +class UnixStreamHTTPConnection(httplib.HTTPConnection): + """ + 使用socket连接 + """ + def connect(self): + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.connect(self.host) + + +class UnixStreamTransport(Transport, object): + """ + 自定义xmlrpclb的Transport,以便使用socket文件连接通信 + """ + def __init__(self, socket_path): + self.socket_path = socket_path + super(UnixStreamTransport, self).__init__() + + def make_connection(self, host): + return UnixStreamHTTPConnection(self.socket_path) + + def shell(cmd): """ 执行系统命令 @@ -152,7 +171,7 @@ class HealthCheck(object): self.mail_config = None self.wechat_config = None - self.supervisor_url = 'http://localhost:9001/RPC2' + self.supervisor_url = '/var/run/supervisor.sock' if 'config' in config: self.mail_config = config['config'].get('mail') @@ -171,6 +190,17 @@ class HealthCheck(object): self.cumulative = False self.max_cpu = 90 + def get_supervisor_conn(self): + """ + 获取supervisor的连接 + :return: + """ + if 'RPC2' == self.supervisor_url[-4:]: + s = ServerProxy(self.supervisor_url) + else: + s = ServerProxy('http://', transport=UnixStreamTransport(self.supervisor_url)) + return s + def get_pid(self, program, kind, pid_file): """ 获取进程pid @@ -184,7 +214,7 @@ class HealthCheck(object): if kind == 'supervisor': try: - s = ServerProxy(self.supervisor_url) + s = self.get_supervisor_conn() info = s.supervisor.getProcessInfo(program) pid = info.get('pid') err = info.get('description') @@ -210,7 +240,7 @@ class HealthCheck(object): err = "Can't get pid from file" else: err = "PID: pid file not set" - self.log(program, err) + self.log(program, err) if not pid: pid = 0 @@ -288,7 +318,8 @@ class HealthCheck(object): if check_state[program]['failure'] >= failureThreshold: # 失败后, 只触发一次action,或者检测错误数是2倍periodSeconds的平方数时触发(避免重启失败导致服务一直不可用) if not check_state[program]['action'] or ( - check_state[program]['failure'] != 0 and check_state[program]['failure'] % (periodSeconds * 2) == 0): + check_state[program]['failure'] != 0 and check_state[program]['failure'] % ( + periodSeconds * 2) == 0): action_param = { 'action_type': action_type, 'check_result': check_result.get('msg', ''), @@ -474,7 +505,7 @@ class HealthCheck(object): self.log(program, 'Action: restart') result = 'success' try: - s = ServerProxy(self.supervisor_url) + s = self.get_supervisor_conn() info = s.supervisor.getProcessInfo(program) except Exception as e: result = 'Get %s ProcessInfo Error: %s' % (program, e) @@ -519,7 +550,7 @@ class HealthCheck(object): exitcode, stdout, stderr = shell(cmd) if exitcode == 0: - self.log(program, "Action: exec result success") + self.log(program, "Action: exec result success") else: result = 'Failed to exec %s, exiting: %s' % (program, exitcode) self.log(program, "Action: exec result %s", result) @@ -681,6 +712,9 @@ class HealthCheck(object): :return: """ self.log('healthCheck:', 'start') + s = self.get_supervisor_conn() + info = s.supervisor.getProcessInfo('api-paibloks-show') + print(info) threads = [] for key, value in iteritems(self.program_config): @@ -700,7 +734,7 @@ if __name__ == '__main__': if not os.path.exists(config_file): example_config = """ config: # 脚本配置名称,请勿更改 - supervisordUrl: http://localhost:9001/RPC2 # supervisor的rpc接口地址 +# supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socker文件/var/run/supervisor.sock # mail: # stmp配置 # host: 'smtp.test.com' # port': '465' @@ -785,4 +819,8 @@ cat4: config = yaml.load(f) check = HealthCheck(config) + print(dir(check)) + # s = check.get_supervisor_conn() + # info = s.supervisor.getProcessInfo('api-paibloks-show') + # print(info) check.start()