From 0d47dadde3e466296b9694824d5134f1afea28c4 Mon Sep 17 00:00:00 2001 From: lework Date: Thu, 17 Oct 2019 10:01:37 +0800 Subject: [PATCH] add supervisor --- python/facts_os_check/ansible.py | 1 + python/supervisor_event_exited.py | 151 ++++++++++++++++++++++++++++++ python/supervisor_exporter.py | 124 ++++++++++++++++++++++++ 3 files changed, 276 insertions(+) create mode 100644 python/supervisor_event_exited.py create mode 100644 python/supervisor_exporter.py diff --git a/python/facts_os_check/ansible.py b/python/facts_os_check/ansible.py index b880686..2a0bd79 100644 --- a/python/facts_os_check/ansible.py +++ b/python/facts_os_check/ansible.py @@ -3,6 +3,7 @@ # @Time : 2019-10-09 # @Author : lework +# @Desc : 使用ansible的facts数据生成linux资源报告 import datetime diff --git a/python/supervisor_event_exited.py b/python/supervisor_event_exited.py new file mode 100644 index 0000000..6ded4da --- /dev/null +++ b/python/supervisor_event_exited.py @@ -0,0 +1,151 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# @Time : 2019-10-16 +# @Author : lework +# @Desc : 一个事件监听器,订阅PROCESS_STATE_CHANGE事件。当supervisor管理的进程意外过渡到EXITED状态时,它将发送邮件。 + +# [eventlistener:supervisor_event_exited] +# process_name=%(program_name)s +# command=/usr/bin/python /data/scripts/supervisor_event_exited.py +# autostart=true +# autorestart=true +# events=PROCESS_STATE +# log_stdout=true +# log_stderr=true +# stdout_logfile=/var/log/supervisor/supervisor_event_exited-stdout.log +# stdout_logfile_maxbytes=50MB +# stdout_logfile_backups=3 +# buffer_size=10 +# stderr_logfile=/var/log/supervisor/supervisor_event_exited-stderr.log +# stderr_logfile_maxbytes=50MB +# stderr_logfile_backups=3 + + +import os +import smtplib +import socket +import sys +from supervisor import childutils +from email.header import Header +from email.mime.text import MIMEText + + +class CrashMail: + def __init__(self, mail_config, programs): + self.mail_config = mail_config + self.programs = programs + self.stdin = sys.stdin + self.stdout = sys.stdout + self.stderr = sys.stderr + self.time = '' + + def write_stderr(self, s): + s = s+'\n' + if self.time: + s = '[%s] %s' % (self.time, s) + self.stderr.write(s) + self.stderr.flush() + + def runforever(self): + # 死循环, 处理完 event 不退出继续处理下一个 + while 1: + # 使用 self.stdin, self.stdout, self.stderr 代替 sys.* + headers, payload = childutils.listener.wait(self.stdin, self.stdout) + + self.time = childutils.get_asctime() + self.write_stderr('[headers] %s' % str(headers)) + self.write_stderr('[payload] %s' % str(payload)) + + # 不处理不是 PROCESS_STATE_EXITED 类型的 event, 直接向 stdout 写入"RESULT\nOK" + if headers['eventname'] != 'PROCESS_STATE_EXITED': + childutils.listener.ok(self.stdout) + continue + + # 解析 payload, 这里我们只用这个 pheaders. + # pdata 在 PROCESS_LOG_STDERR 和 PROCESS_COMMUNICATION_STDOUT 等类型的 event 中才有 + pheaders, pdata = childutils.eventdata(payload + '\n') + + # 如果在programs中设置,就只处理programs中的,否则全部处理. + if len(self.programs) !=0 and pheaders['groupname'] not in self.programs: + childutils.listener.ok(self.stdout) + continue + + # 过滤掉 expected 的 event, 仅处理 unexpected 的 + # 当 program 的退出码为对应配置中的 exitcodes 值时, expected=1; 否则为0 + if int(pheaders['expected']): + childutils.listener.ok(self.stdout) + continue + + # 获取系统主机名和ip地址 + hostname = socket.gethostname() + ip = socket.gethostbyname(hostname) + + # 构造报警内容 + msg = "Host: %s(%s)\nProcess: %s\nPID: %s\nEXITED unexpectedly from state: %s" % \ + (hostname, ip, pheaders['processname'], pheaders['pid'], pheaders['from_state']) + + subject = '[Supervistord] %s crashed at %s' % (pheaders['processname'], self.time) + + self.write_stderr('[INFO] unexpected exit, mailing') + + # 发送邮件 + self.send_mail(subject, msg) + + # 向 stdout 写入"RESULT\nOK",并进入下一次循环 + childutils.listener.ok(self.stdout) + + def send_mail(self, subject, content): + """ + :param subject: str + :param content: str + :return: bool + """ + + mail_port = self.mail_config.get('mail_port', '') + mail_host = self.mail_config.get('mail_host', '') + mail_user = self.mail_config.get('mail_user', '') + mail_pass = self.mail_config.get('mail_pass', '') + to_list = self.mail_config.get('to_list', []) + + msg = MIMEText(content, _subtype='plain', _charset='utf-8') + msg['Subject'] = Header(subject, 'utf-8') + msg['From'] = mail_user + msg['to'] = ",".join(to_list) + try: + s = smtplib.SMTP_SSL(mail_host, mail_port) + s.login(mail_user, mail_pass) + s.sendmail(mail_user, to_list, msg.as_string()) + s.quit() + self.write_stderr('[mail] ok') + return True + except Exception as e: + self.write_stderr('[mail] error\n\n%s\n' % e) + return False + + +def main(): + # listener 必须交由 supervisor 管理, 直接运行是不行的 + if not 'SUPERVISOR_SERVER_URL' in os.environ: + sys.stderr.write('crashmail must be run as a supervisor event ' + 'listener\n') + sys.stderr.flush() + return + + # 设置smtp信息 + mail_config = { + 'mail_host': 'smtp.lework.com', + 'mail_port': '465', + 'mail_user': 'ops@lework.com', + 'mail_pass': '123123123', + 'to_list': ['lework@lework.com'] + } + + # 设置要检测的program,不设置则检测全部 + programs = [] + prog = CrashMail(mail_config, programs) + prog.runforever() + + +if __name__ == '__main__': + main() diff --git a/python/supervisor_exporter.py b/python/supervisor_exporter.py new file mode 100644 index 0000000..89db058 --- /dev/null +++ b/python/supervisor_exporter.py @@ -0,0 +1,124 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# @Time : 2019-10-15 +# @Author : lework +# @Desc : 收集supervisor的进程状态信息,并将信息暴露给Prometheus。 + +# [program:supervisor_exporter] +# process_name=%(program_name)s +# command=/usr/bin/python /root/scripts/supervisor_exporter.py +# autostart=true +# autorestart=true +# redirect_stderr=true +# stdout_logfile=/var/log/supervisor/supervisor_exporter.log +# stdout_logfile_maxbytes=50MB +# stdout_logfile_backups=3 +# buffer_size=10 + + +from xmlrpclib import ServerProxy +from prometheus_client import Gauge, Counter, CollectorRegistry ,generate_latest, start_http_server +from time import sleep + +try: + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +except ImportError: + # Python 3 + from http.server import BaseHTTPRequestHandler, HTTPServer + +def is_runing(state): + state_info = { + # 'STOPPED': 0, + 'STARTING': 10, + 'RUNNING': 20 + # 'BACKOFF': 30, + # 'STOPPING': 40 + # 'EXITED': 100, + # 'FATAL': 200, + # 'UNKNOWN': 1000 + } + if state in state_info.values(): + return True + return False + + +def get_metrics(): + collect_reg = CollectorRegistry(auto_describe=True) + + try: + s = ServerProxy(supervisord_url) + data = s.supervisor.getAllProcessInfo() + except Exception as e: + print("unable to call supervisord: %s" % e) + return collect_reg + + labels=('name', 'group') + + metric_state = Gauge('state', "Process State", labelnames=labels, subsystem='supervisord', registry=collect_reg) + metric_exit_status=Gauge('exit_status', "Process Exit Status", labelnames=labels, subsystem='supervisord', registry=collect_reg) + metric_up = Gauge('up', "Process Up", labelnames=labels, subsystem='supervisord', registry=collect_reg) + metric_start_time_seconds=Counter('start_time_seconds', "Process start time", labelnames=labels, subsystem='supervisord', registry=collect_reg) + + for item in data: + now = item.get('now', '') + group = item.get('group', '') + description = item.get('description', '') + stderr_logfile = item.get('stderr_logfile', '') + stop = item.get('stop', '') + statename = item.get('statename', '') + start = item.get('start', '') + state = item.get('state', '') + stdout_logfile = item.get('stdout_logfile', '') + logfile = item.get('logfile', '') + spawnerr = item.get('spawnerr', '') + name = item.get('name', '') + exitstatus = item.get('exitstatus', '') + + labels = (name, group) + + metric_state.labels(*labels).set(state) + metric_exit_status.labels(*labels).set(exitstatus) + + if is_runing(state): + metric_up.labels(*labels).set(1) + metric_start_time_seconds.labels(*labels).inc(start) + else: + metric_up.labels(*labels).set(0) + + return collect_reg + + +class myHandler(BaseHTTPRequestHandler): + def do_GET(self): + self.send_response(200) + self.send_header('Content-type','text/plain') + self.end_headers() + data="" + if self.path=="/": + data="hello, supervistor." + elif self.path=="/metrics": + data=generate_latest(get_metrics()) + else: + data="not found" + # Send the html message + self.wfile.write(data) + return + +if __name__ == '__main__': + try: + supervisord_url = "http://localhost:9001/RPC2" + + PORT_NUMBER = 8000 + #Create a web server and define the handler to manage the + #incoming request + server = HTTPServer(('', PORT_NUMBER), myHandler) + print 'Started httpserver on port ' , PORT_NUMBER + + #Wait forever for incoming htto requests + server.serve_forever() + + except KeyboardInterrupt: + print '^C received, shutting down the web server' + server.socket.close() +