From 2cbfcd54c6f569d9b856ca5fbb18bdf37001ba15 Mon Sep 17 00:00:00 2001 From: lework Date: Tue, 12 Nov 2019 17:16:59 +0800 Subject: [PATCH] add exec action --- python/supervisor_healthCheck.py | 231 ++++++++++++++++++++++--------- 1 file changed, 164 insertions(+), 67 deletions(-) diff --git a/python/supervisor_healthCheck.py b/python/supervisor_healthCheck.py index d2c0cad..a8099f0 100644 --- a/python/supervisor_healthCheck.py +++ b/python/supervisor_healthCheck.py @@ -17,6 +17,7 @@ import smtplib import datetime import platform import threading +import subprocess from xmlrpclib import ServerProxy, Fault from email.header import Header from email.mime.text import MIMEText @@ -27,7 +28,6 @@ try: except ImportError: import http.client as httplib - PY2 = sys.version_info[0] == 2 PY3 = sys.version_info[0] == 3 @@ -51,10 +51,18 @@ def shell(cmd): """ 执行系统命令 :param cmd: - :return: + :return: (exitcode, stdout, stderr) """ - with os.popen(cmd) as f: - return f.read() + # with os.popen(cmd) as f: + # return f.read() + env_to_pass = dict(os.environ) + proc = subprocess.Popen(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env_to_pass) + proc.wait() + return (proc.returncode,) + proc.communicate() def get_proc_cpu(pid): @@ -65,12 +73,12 @@ def get_proc_cpu(pid): """ pscommand = 'ps -opcpu= -p %s' - data = shell(pscommand % pid) + _, data, _ = shell(pscommand % pid) if not data: # 未获取到数据值,或者没有此pid信息 return None try: - cpu_utilization = data.lstrip().rstrip() + cpu_utilization = data.strip() cpu_utilization = float(cpu_utilization) except ValueError: # 获取的结果不包含数据,或者无法识别cpu_utilization @@ -101,7 +109,7 @@ def get_proc_rss(pid, cumulative=False): if cumulative: # 统计进程的子进程rss - data = shell(pstreecommand) + _, data, _ = shell(pstreecommand) data = data.strip() procs = [] @@ -120,12 +128,12 @@ def get_proc_rss(pid, cumulative=False): return None else: - data = shell(pscommand % pid) + _, data, _ = shell(pscommand % pid) if not data: # 未获取到数据值,或者没有此pid信息 return None try: - rss = data.lstrip().rstrip() + rss = data.strip() rss = int(rss) except ValueError: # 获取的结果不包含数据,或者无法识别rss @@ -149,7 +157,7 @@ class HealthCheck(object): if 'config' in config: self.mail_config = config['config'].get('mail') self.wechat_config = config['config'].get('wechat') - self.supervisor_url = config['config'].get('supervistor_url', self.supervisor_url) + self.supervisor_url = config['config'].get('supervistorUrl', self.supervisor_url) config.pop('config') self.program_config = config @@ -163,6 +171,51 @@ class HealthCheck(object): self.cumulative = False self.max_cpu = 90 + def get_pid(self, program, kind, pid_file): + """ + 获取进程pid + :param program: + :param kind: + :param args: + :return: + """ + pid = 0 + err = '' + + if kind == 'supervisor': + try: + s = ServerProxy(self.supervisor_url) + info = s.supervisor.getProcessInfo(program) + pid = info.get('pid') + err = info.get('description') + except Exception as e: + self.log(program, "PID: Can't get pid from supervisor %s ", e) + elif kind == 'name': + pscommand = "ps -A -o pid,cmd |grep '[%s]%s' | awk '{print $1}' | head -1" + exitcode, stdout, stderr = shell(pscommand % (program[0], program[1:])) + if exitcode == 0: + pid = stdout.strip() + else: + self.log(program, "PID: Can't get pid from name %s ", stderr) + pid = 0 + err = stderr + + elif kind == 'file': + if pid_file: + try: + with open(pid_file) as f: + pid = f.read().strip() + except Exception as e: + self.log(program, "PID: Can't get pid from file %s ", e) + err = "Can't get pid from file" + else: + err = "PID: pid file not set" + self.log(program, err) + if not pid: + pid = 0 + + return pid, err + def log(self, program, msg, *args): """ 写信息到 STDERR. @@ -189,6 +242,7 @@ class HealthCheck(object): successThreshold = config.get('successThreshold', self.successThreshold) initialDelaySeconds = config.get('initialDelaySeconds', self.initialDelaySeconds) action_type = config.get('action', 'restart') + action_exec_cmd = config.get('execCmd') check_type = config.get('type', 'HTTP').lower() check_method = self.http_check @@ -234,9 +288,13 @@ class HealthCheck(object): if check_state[program]['failure'] >= failureThreshold: # 失败后, 只触发一次action,或者检测错误数是2倍periodSeconds的平方数时触发(避免重启失败导致服务一直不可用) if not check_state[program]['action'] or ( - check_state[program]['failure'] != 0 and check_state[program][ - 'failure'] % (periodSeconds * 2) == 0): - self.action(program, action_type, check_result.get('msg', '')) + check_state[program]['failure'] != 0 and check_state[program]['failure'] % (periodSeconds * 2) == 0): + action_param = { + 'action_type': action_type, + 'check_result': check_result.get('msg', ''), + 'action_exec_cmd': action_exec_cmd + } + self.action(program, **action_param) check_state[program]['action'] = True # 间隔时间清0 @@ -287,7 +345,7 @@ class HealthCheck(object): self.log(program, 'HTTP: config_json not loads: %s , %s', json, e) check_info = '%s %s %s %s %s %s' % (config_host, config_port, config_path, config_method, - config_body, headers) + config_body, headers) try: httpClient = httplib.HTTPConnection(config_host, config_port, timeout=config_timeoutSeconds) @@ -302,8 +360,8 @@ class HealthCheck(object): if res.status != httplib.OK: return {'status': 'failure', 'msg': '[http_check] return code %s' % res.status, 'info': check_info} - - return {'status': 'success','info': check_info} + + return {'status': 'success', 'info': check_info} def tcp_check(self, config): """ @@ -324,7 +382,7 @@ class HealthCheck(object): except Exception as e: self.log(program, 'TCP: conn error, %s', e) return {'status': 'failure', 'msg': '[tcp_check] %s' % e, 'info': check_info} - return {'status': 'success','info': check_info} + return {'status': 'success', 'info': check_info} def mem_check(self, config): """ @@ -333,27 +391,25 @@ class HealthCheck(object): :return: dict """ program = config.get('program') - max_rss = config.get('max_rss', self.max_rss) + max_rss = config.get('maxRss', self.max_rss) cumulative = config.get('cumulative', self.cumulative) - check_info = 'max_rss:%sMB cumulative:%s' % (max_rss,cumulative) - - try: - s = ServerProxy(self.supervisor_url) - info = s.supervisor.getProcessInfo(program) - pid = info.get('pid') - if pid == 0: - self.log(program, 'MEM: check error, program not starting') - return {'status': 'failure', - 'msg': '[mem_check] program not starting, message: %s' % (info.get('description')),'info': check_info} - now_rss = get_proc_rss(pid, cumulative) - check_info = '%s now_rss:%sMB' % (check_info, now_rss) - if now_rss >= int(max_rss): - return {'status': 'failure', 'msg': '[mem_check] max_rss(%sMB) now_rss(%sMB)' % (max_rss, now_rss), 'info': check_info} - except Exception as e: - self.log(program, 'MEM: check error, %s', e) - return {'status': 'failure', 'msg': '[mem_check] %s' % e,'info': check_info} - - return {'status': 'success','info': check_info} + pid_get = config.get('pidGet', 'supervisor') + pid_file = config.get('pidFile', ) + check_info = 'max_rss:%sMB cumulative:%s' % (max_rss, cumulative) + + pid, err = self.get_pid(program, pid_get, pid_file) + if pid == 0: + self.log(program, 'MEM: check error, program not starting') + return {'status': 'failure', + 'msg': '[mem_check] program not starting, message: %s' % err, + 'info': check_info} + now_rss = get_proc_rss(pid, cumulative) + check_info = '%s now_rss:%sMB pid:%s' % (check_info, now_rss, pid) + if now_rss >= int(max_rss): + return {'status': 'failure', 'msg': '[mem_check] max_rss(%sMB) now_rss(%sMB)' % (max_rss, now_rss), + 'info': check_info} + + return {'status': 'success', 'info': check_info} def cpu_check(self, config): """ @@ -362,28 +418,27 @@ class HealthCheck(object): :return: dict """ program = config.get('program') - max_cpu = config.get('max_cpu', self.max_cpu) + max_cpu = config.get('maxCpu', self.max_cpu) + pid_get = config.get('pidGet', 'supervisor') + pid_file = config.get('pidFile', ) check_info = 'max_cpu:{cpu}%'.format(cpu=max_cpu) - try: - s = ServerProxy(self.supervisor_url) - info = s.supervisor.getProcessInfo(program) - pid = info.get('pid') - if pid == 0: - self.log(program, 'CPU: check error, program not starting') - return {'status': 'failure', - 'msg': '[cpu_check] program not starting, message: %s' % (info.get('description')),'info': check_info} - now_cpu = get_proc_cpu(pid) - check_info = '{info} now_cpu:{now}%'.format(info=check_info, now=now_cpu) - if now_cpu >= int(max_cpu): - return {'status': 'failure', 'msg': '[cpu_check] max_cpu({max_cpu}%) now_cpu({now}%)'.format(max_cpu=max_cpu, now=now_cpu),'info': check_info} - except Exception as e: - self.log(program, 'CPU: check error, %s', e) - return {'status': 'failure', 'msg': '[cpu_check] %s' % e,'info': check_info} - - return {'status': 'success','info': check_info} - - def action(self, program, action_type, error): + pid, err = self.get_pid(program, pid_get, pid_file) + if pid == 0: + self.log(program, 'CPU: check error, program not starting') + return {'status': 'failure', + 'msg': '[cpu_check] program not starting, message: %s' % err, + 'info': check_info} + now_cpu = get_proc_cpu(pid) + check_info = '{info} now_cpu:{now}% pid:{pid}'.format(info=check_info, now=now_cpu, pid=pid) + if now_cpu >= int(max_cpu): + return {'status': 'failure', + 'msg': '[cpu_check] max_cpu({max_cpu}%) now_cpu({now}%)'.format(max_cpu=max_cpu, now=now_cpu), + 'info': check_info} + + return {'status': 'success', 'info': check_info} + + def action(self, program, **args): """ 执行动作 :param program: @@ -391,15 +446,24 @@ class HealthCheck(object): :param error: :return: """ + action_type = args.get('action_type') + check_result = args.get('check_result') + action_exec_cmd = args.get('action_exec_cmd') + self.log(program, 'Action: %s', action_type) action_list = action_type.split(',') + if 'restart' in action_list: restart_result = self.action_supervistor_restart(program) - error += '\r\n Restart:%s' % restart_result + check_result += '\r\n Restart:%s' % restart_result + elif 'exec' in action_list: + exec_result = self.action_exec(program, action_exec_cmd) + check_result += '\r\n Exec:%s' % exec_result + if 'email' in action_list and self.mail_config: - self.action_email(program, action_type, error) + self.action_email(program, action_type, check_result) if 'wechat' in action_list and self.wechat_config: - self.action_wechat(program, action_type, error) + self.action_wechat(program, action_type, check_result) def action_supervistor_restart(self, program): """ @@ -442,6 +506,26 @@ class HealthCheck(object): return result + def action_exec(self, program, cmd): + """ + 执行系统命令 + :param program: + :param exec: + :return: + """ + self.log(program, 'Action: exec') + result = 'success' + + exitcode, stdout, stderr = shell(cmd) + + if exitcode == 0: + self.log(program, "Action: exec result success") + else: + result = 'Failed to exec %s, exiting: %s' % (program, exitcode) + self.log(program, "Action: exec result %s", result) + + return result + def action_email(self, program, action_type, msg): """ 发送email @@ -616,7 +700,7 @@ if __name__ == '__main__': if not os.path.exists(config_file): example_config = """ config: # 脚本配置名称,请勿更改 - supervisord_url: http://localhost:9001/RPC2 # supervisor的rpc接口地址 + supervisordUrl: http://localhost:9001/RPC2 # supervisor的rpc接口地址 # mail: # stmp配置 # host: 'smtp.test.com' # port': '465' @@ -631,25 +715,34 @@ config: # 脚本配置名称,请勿更 # toparty: # totag: +# 内存方式监控 cat1: # supervisor中配置的program名称 type: mem # 检查类型: http,tcp,mem,cpu 默认: http - max_rss: 1024 # 单位MB, 默认: 1024 + maxRss: 1024 # 内存阈值, 超过则为检测失败. 单位MB, 默认: 1024 cumulative: True # 是否统计子进程的内存, 默认: False + pidGet: supervistor # 获取pid的方式: supervistor,name,file, 选择name时,按program名称搜索pid,选择file时,需指定pidFile 默认: supervistor + pidFile: /var/run/t.pid # 指定pid文件的路径, 只在pidGet为file的时候有用 periodSeconds: 10 # 检查的频率(以秒为单位), 默认: 5 initialDelaySeconds: 10 # 首次检查等待的时间(以秒为单位), 默认: 1 failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 - action: restart,email # 触发的动作: restart,email,wechat 默认: restart + action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart + execCmd: command # action exec 的执行命令 +# cpu方式监控 cat2: # supervisor中配置的program名称 type: cpu # 检查类型: http,tcp,mem,cpu 默认: http - max_cpu: 80 # cpu使用百分比,单位% 默认: 90% + maxCpu: 80 # CPU阈值, 超过则为检测失败. 单位% 默认: 90% + pidGet: supervistor # 获取pid的方式: supervistor,name,file, 选择name时,按program名称搜索pid,选择file时,需指定pidFile 默认: supervistor + pidFile: /var/run/t.pid # 指定pid文件的路径, 只在pidGet为file的时候有用 periodSeconds: 10 # 检查的频率(以秒为单位), 默认: 5 initialDelaySeconds: 10 # 首次检查等待的时间(以秒为单位), 默认: 1 failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 - action: restart,wechat # 触发的动作: restart,email,wechat 默认: restart + action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart + execCmd: command # action exec 的执行命令 +# HTTP方式监控 cat3: type: HTTP mode: POST # http动作:POST,GET 默认: GET @@ -665,7 +758,10 @@ cat3: timeoutSeconds: 5 # 检查超时的秒数, 默认: 3 failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 - action: restart,email # 触发的动作: restart,email,wechat 默认: restart + action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart + execCmd: command # action exec 的执行命令 + +# TCP方式监控 cat4: type: TCP host: 127.0.0.1 # 主机地址, 默认: localhost @@ -675,7 +771,8 @@ cat4: timeoutSeconds: 5 # 检查超时的秒数, 默认: 3 failureThreshold: 3 # 检查成功后,最少连续检查失败多少次才被认定为失败, 默认: 3 successThreshold: 2 # 失败后检查成功的最小连续成功次数, 默认:1 - action: restart,email # 触发的动作: restart,email,wechat 默认: restart + action: restart,email # 触发的动作: restart,exec,email,wechat (restart和exec互斥,同时设置时restart生效) 默认: restart + execCmd: command # action exec 的执行命令 """ with open(config_file, 'w') as f: f.write(example_config)