lework 4 years ago
parent
commit
76bb472213
  1. 146
      python/supervisor_healthCheck.py
  2. 155
      shell/download.sh
  3. 20
      shell/judge.sh
  4. 31
      shell/proxy.sh

146
python/supervisor_healthCheck.py

@ -1,9 +1,10 @@ @@ -1,9 +1,10 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
# @Time : 2019-11-25
# @Time : 2020-06-05
# @Author : lework
# @Desc : 针对supervisor的应用进行健康检查
# @Version : 1.5
import os
@ -146,6 +147,33 @@ def get_proc_rss(pid, cumulative=False): @@ -146,6 +147,33 @@ def get_proc_rss(pid, cumulative=False):
return rss
class WorkerThread(threading.Thread):
"""
自定义Thread记录线程的异常信息
"""
def __init__(self, target=None, args=(), kwargs={}, name=None):
super(WorkerThread, self).__init__(target=target, args=args, kwargs=kwargs, name=name)
self._target = target
self._args = args
self._kwargs = kwargs
self.exception = None
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except Exception as e:
# 记录线程异常
self.exception = sys.exc_info()
finally:
del self._target, self._args, self._kwargs
def get_exception(self):
return self.exception
class HealthCheck(object):
def __init__(self, config):
"""
@ -167,6 +195,9 @@ class HealthCheck(object): @@ -167,6 +195,9 @@ class HealthCheck(object):
self.program_config = config
# 只保留通知action
self.notice_action = ['email', 'wechat']
self.periodSeconds = 5
self.failureThreshold = 3
self.successThreshold = 1
@ -199,6 +230,7 @@ class HealthCheck(object): @@ -199,6 +230,7 @@ class HealthCheck(object):
err = ''
if kind == 'supervisor':
# 通过supervisor程序获取pid
try:
s = self.get_supervisord_conn()
info = s.supervisor.getProcessInfo(program)
@ -206,7 +238,9 @@ class HealthCheck(object): @@ -206,7 +238,9 @@ class HealthCheck(object):
err = info.get('description')
except Exception as e:
self.log(program, "PID: Can't get pid from supervisor %s ", e)
elif kind == 'name':
# 通过进程名称获取pid
pscommand = "ps -A -o pid,cmd | grep '[%s]%s' | awk '{print $1}' | head -1"
exitcode, stdout, stderr = shell(pscommand % (program[0], program[1:]))
if exitcode == 0:
@ -217,6 +251,7 @@ class HealthCheck(object): @@ -217,6 +251,7 @@ class HealthCheck(object):
err = stderr
elif kind == 'file':
# 通过文件获取pid
if pid_file:
try:
with open(pid_file) as f:
@ -225,8 +260,9 @@ class HealthCheck(object): @@ -225,8 +260,9 @@ class HealthCheck(object):
self.log(program, "PID: Can't get pid from file %s ", e)
err = "Can't get pid from file"
else:
err = "PID: pid file not set"
err = "PID: pid file not set."
self.log(program, err)
if not pid:
pid = 0
@ -255,18 +291,20 @@ class HealthCheck(object): @@ -255,18 +291,20 @@ class HealthCheck(object):
:return:
"""
check_state = {}
program = config.get('program')
periodSeconds = config.get('periodSeconds', self.periodSeconds)
failureThreshold = config.get('failureThreshold', self.failureThreshold)
successThreshold = config.get('successThreshold', self.successThreshold)
initialDelaySeconds = config.get('initialDelaySeconds', self.initialDelaySeconds)
sendResolved = config.get('sendResolved', self.sendResolved)
action_type = config.get('action', 'restart')
check_type = config.get('type', 'http').lower()
check_type = config.get('type', 'HTTP').lower()
if check_type == 'http':
check_method = self.http_check
if check_type == 'tcp':
elif check_type == 'tcp':
check_method = self.tcp_check
elif check_type == 'mem':
check_method = self.mem_check
@ -281,7 +319,7 @@ class HealthCheck(object): @@ -281,7 +319,7 @@ class HealthCheck(object):
'success': 0,
'action': False
}
self.log(program, 'CONFIG: %s', config)
self.log(program, '[CONFIG]: %s', config)
time.sleep(initialDelaySeconds)
# self.log(program, '%s check state: %s', check_type, json.dumps(check_state[program]))
@ -289,7 +327,7 @@ class HealthCheck(object): @@ -289,7 +327,7 @@ class HealthCheck(object):
check_result = check_method(config)
check_status = check_result.get('status', None)
check_info = check_result.get('info', '')
self.log(program, '%s check: info(%s) state(%s)', check_type.upper(), check_info, check_status)
self.log(program, '[%s check]: info(%s) state(%s)', check_type.upper(), check_info, check_status)
if check_status == 'failure':
check_state[program]['failure'] += 1
@ -300,10 +338,8 @@ class HealthCheck(object): @@ -300,10 +338,8 @@ class HealthCheck(object):
if check_state[program]['success'] >= successThreshold:
# 只有开启恢复通知和检测失败并且执行操作后,才可以发送恢复通知
if sendResolved and check_state[program]['action']:
# 只保留通知action
notice_action = ['email', 'wechat']
send_action = ','.join(list(set(action_type.split(',')) & set(notice_action)))
self.log(program, 'Use %s send resolved.', send_action)
send_action = ','.join(list(set(action_type.split(',')) & set(self.notice_action)))
self.log(program, '[Resolved] Use %s.', send_action)
action_param = {
'check_status': check_status,
'action_type': send_action,
@ -364,7 +400,7 @@ class HealthCheck(object): @@ -364,7 +400,7 @@ class HealthCheck(object):
try:
headers.update(json.loads(config_hearders))
except Exception as e:
self.log(program, 'HTTP: config_headers not loads: %s , %s', config_hearders, e)
self.log(program, '[http_check]: config_headers not loads: %s , %s', config_hearders, e)
if config_json:
headers['Content-Type'] = 'application/json'
@ -376,7 +412,7 @@ class HealthCheck(object): @@ -376,7 +412,7 @@ class HealthCheck(object):
try:
config_body = json.dumps(config_json)
except Exception as e:
self.log(program, 'HTTP: config_json not loads: %s , %s', json, e)
self.log(program, '[http_check]: config_json not loads: %s , %s', json, e)
check_info = '%s %s %s %s %s %s' % (config_host, config_port, config_path, config_method,
config_body, headers)
@ -386,7 +422,7 @@ class HealthCheck(object): @@ -386,7 +422,7 @@ class HealthCheck(object):
httpClient.request(config_method, config_path, config_body, headers=headers)
res = httpClient.getresponse()
except Exception as e:
self.log(program, 'HTTP: conn error, %s', e)
self.log(program, '[http_check]: conn error, %s', e)
return {'status': 'failure', 'msg': '[http_check] %s' % e, 'info': check_info}
finally:
if httpClient:
@ -414,7 +450,7 @@ class HealthCheck(object): @@ -414,7 +450,7 @@ class HealthCheck(object):
sock.connect((host, port))
sock.close()
except Exception as e:
self.log(program, 'TCP: conn error, %s', e)
self.log(program, '[tcp_check]: conn error, %s', e)
return {'status': 'failure', 'msg': '[tcp_check] %s' % e, 'info': check_info}
return {'status': 'success', 'msg': '[tcp_check] connection succeeded', 'info': check_info}
@ -433,9 +469,9 @@ class HealthCheck(object): @@ -433,9 +469,9 @@ class HealthCheck(object):
pid, err = self.get_pid(program, pid_get, pid_file)
if pid == 0:
self.log(program, 'MEM: check error, program not starting')
self.log(program, '[mem_check]: check error, program not starting.')
return {'status': 'failure',
'msg': '[mem_check] program not starting, message: %s' % err,
'msg': '[mem_check] program not starting, message: %s.' % err,
'info': check_info}
now_rss = get_proc_rss(pid, cumulative)
check_info = '%s now_rss:%sMB pid:%s' % (check_info, now_rss, pid)
@ -460,9 +496,9 @@ class HealthCheck(object): @@ -460,9 +496,9 @@ class HealthCheck(object):
pid, err = self.get_pid(program, pid_get, pid_file)
if pid == 0:
self.log(program, 'CPU: check error, program not starting')
self.log(program, '[cpu_check]: check error, program not starting.')
return {'status': 'failure',
'msg': '[cpu_check] program not starting, message: %s' % err,
'msg': '[cpu_check] program not starting, message: %s.' % err,
'info': check_info}
now_cpu = get_proc_cpu(pid)
check_info = '{info} now_cpu:{now}% pid:{pid}'.format(info=check_info, now=now_cpu, pid=pid)
@ -487,7 +523,7 @@ class HealthCheck(object): @@ -487,7 +523,7 @@ class HealthCheck(object):
check_status = args.get('check_status')
config = args.get('config')
self.log(program, 'Action: %s', action_type)
self.log(program, '[Action: %s]', action_type)
action_list = action_type.split(',')
if 'restart' in action_list:
@ -515,38 +551,35 @@ class HealthCheck(object): @@ -515,38 +551,35 @@ class HealthCheck(object):
:param program:
:return:
"""
self.log(program, 'Action: restart')
result = 'success'
try:
s = self.get_supervisord_conn()
info = s.supervisor.getProcessInfo(program)
except Exception as e:
result = 'Get %s ProcessInfo Error: %s' % (program, e)
self.log(program, 'Action: restart %s' % result)
self.log(program, '[Action: restart] %s' % result)
return result
if info['state'] == 20:
self.log(program, 'Action: restart stop process')
try:
stop_result = s.supervisor.stopProcess(program)
self.log(program, 'Action: restart stop result %s', stop_result)
self.log(program, '[Action: restart] stop result %s', stop_result)
except Fault as e:
result = 'Failed to stop process %s, exiting: %s' % (program, e)
self.log(program, 'Action: restart stop error %s', result)
self.log(program, '[Action: restart] stop error %s', result)
return result
time.sleep(1)
info = s.supervisor.getProcessInfo(program)
if info['state'] != 20:
self.log(program, 'Action: restart start process')
try:
start_result = s.supervisor.startProcess(program)
self.log(program, '[Action: restart] start result %s', start_result)
except Fault as e:
result = 'Failed to start process %s, exiting: %s' % (program, e)
self.log(program, 'Action: restart start error %s', result)
self.log(program, '[Action: restart] start error %s', result)
return result
self.log(program, 'Action: restart start result %s', start_result)
return result
@ -557,16 +590,15 @@ class HealthCheck(object): @@ -557,16 +590,15 @@ class HealthCheck(object):
:param cmd:
:return:
"""
self.log(program, 'Action: exec')
result = 'success'
exitcode, stdout, stderr = shell(cmd)
if exitcode == 0:
self.log(program, "Action: exec result success")
self.log(program, "[Action: exec] result success")
else:
result = 'Failed to exec %s, exiting: %s' % (program, exitcode)
self.log(program, "Action: exec result %s", result)
self.log(program, "[Action: exec] result %s", result)
return result
@ -577,7 +609,6 @@ class HealthCheck(object): @@ -577,7 +609,6 @@ class HealthCheck(object):
:param pid:
:return:
"""
self.log(program, 'Action: kill')
result = 'success'
if int(pid) < 3:
@ -587,10 +618,10 @@ class HealthCheck(object): @@ -587,10 +618,10 @@ class HealthCheck(object):
exitcode, stdout, stderr = shell(cmd)
if exitcode == 0:
self.log(program, "Action: kill result success")
self.log(program, "[Action: kill] result success")
else:
result = 'Failed to kill %s, pid: %s exiting: %s' % (program, pid, exitcode)
self.log(program, "Action: kill result %s", result)
self.log(program, "[Action: kill] result %s", result)
return result
@ -603,7 +634,6 @@ class HealthCheck(object): @@ -603,7 +634,6 @@ class HealthCheck(object):
:param check_status:
:return:
"""
self.log(program, 'Action: email')
ip = ""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@ -611,7 +641,7 @@ class HealthCheck(object): @@ -611,7 +641,7 @@ class HealthCheck(object):
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
except Exception as e:
self.log(program, 'Action: email get ip error %s' % e)
self.log(program, '[Action: email] get ip error %s' % e)
finally:
s.close()
@ -649,10 +679,10 @@ class HealthCheck(object): @@ -649,10 +679,10 @@ class HealthCheck(object):
s.sendmail(mail_user, to_list, msg.as_string())
s.quit()
except Exception as e:
self.log(program, 'Action: email send error %s' % e)
self.log(program, '[Action: email] send error %s' % e)
return False
self.log(program, 'Action: email send success.')
self.log(program, '[Action: email] send success.')
return True
def action_wechat(self, program, action_type, msg, check_status):
@ -664,8 +694,6 @@ class HealthCheck(object): @@ -664,8 +694,6 @@ class HealthCheck(object):
:param check_status:
:return:
"""
self.log(program, 'Action: wechat')
host = "qyapi.weixin.qq.com"
corpid = self.wechat_config.get('corpid')
@ -686,7 +714,7 @@ class HealthCheck(object): @@ -686,7 +714,7 @@ class HealthCheck(object):
response = httpClient.getresponse()
token = json.loads(response.read())['access_token']
except Exception as e:
self.log(program, 'Action: wechat get token error %s' % e)
self.log(program, '[Action: wechat] get token error %s' % e)
return False
finally:
if httpClient:
@ -700,7 +728,7 @@ class HealthCheck(object): @@ -700,7 +728,7 @@ class HealthCheck(object):
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
except Exception as e:
self.log(program, 'Action: wechat get ip error %s' % e)
self.log(program, '[Action: wechat] get ip error %s' % e)
finally:
s.close()
@ -745,16 +773,16 @@ class HealthCheck(object): @@ -745,16 +773,16 @@ class HealthCheck(object):
response = httpClient.getresponse()
result = json.loads(response.read())
if result['errcode'] != 0:
self.log(program, 'Action: wechat send faild %s' % result)
self.log(program, '[Action: wechat] send faild %s' % result)
return False
except Exception as e:
self.log(program, 'Action: wechat send error %s' % e)
self.log(program, '[Action: wechat] send error %s' % e)
return False
finally:
if httpClient:
httpClient.close()
self.log(program, 'Action: wechat send success')
self.log(program, '[Action: wechat] send success')
return True
def start(self):
@ -762,26 +790,32 @@ class HealthCheck(object): @@ -762,26 +790,32 @@ class HealthCheck(object):
启动检测
:return:
"""
self.log('healthCheck:', 'start')
self.log('healthCheck', 'start')
threads = []
threads_data = {}
for key, value in iteritems(self.program_config):
item = value
item['program'] = key
t = threading.Thread(target=self.check, args=(item,))
t = WorkerThread(target=self.check, args=(item,), name=key)
threads.append(t)
threads_data[key] = item
for t in threads:
try:
t.setDaemon(True)
t.start()
except Exception, e:
print('Exception in ' + t.getName() + ' (catch by main)')
print(t.exc_traceback)
t.setDaemon(True)
t.start()
while 1:
time.sleep(0.1)
for i,t in enumerate(threads):
if not t.isAlive():
thread_name = t.getName()
self.log('[ERROR] Exception in %s (catch by main): %s' % (thread_name, t.get_exception()))
self.log('[ERROR] Create new Thread!')
t = WorkerThread(target=self.check, args=(threads_data[thread_name],), name=thread_name)
t.setDaemon(True)
t.start()
threads[i] = t
if __name__ == '__main__':
@ -803,7 +837,7 @@ config: # 脚本配置名称,请勿更 @@ -803,7 +837,7 @@ config: # 脚本配置名称,请勿更
# supervisordUrl: http://localhost:9001/RPC2 # supervisor的接口地址, 默认使用本地socket文件unix:///var/run/supervisor.sock
# supervisordUser: user # supervisor中设置的username, 没有设置可不填
# supervisordPass: pass # supervisor中设置的password, 没有设置可不填
# mail: # stmp配置
# mail: # 邮箱通知配置
# host: 'smtp.test.com'
# port': '465'
# user': 'ops@test.com'
@ -848,7 +882,7 @@ cat2: # supervisor中配置的program名称 @@ -848,7 +882,7 @@ cat2: # supervisor中配置的program名称
# HTTP方式监控
cat3:
type: HTTP
type: http
mode: POST # http动作:POST,GET 默认: GET
host: 127.0.0.1 # 主机地址, 默认: localhost
path: / # URI地址,默认: /
@ -868,7 +902,7 @@ cat3: @@ -868,7 +902,7 @@ cat3:
# TCP方式监控
cat4:
type: TCP
type: tcp
host: 127.0.0.1 # 主机地址, 默认: localhost
port: 8082 # 检测端口,默认: 80
periodSeconds: 10 # 检查的频率(以秒为单位), 默认: 5

155
shell/download.sh

@ -0,0 +1,155 @@ @@ -0,0 +1,155 @@
exists() {
if command -v $1 >/dev/null 2>&1
then
return 0
else
return 1
fi
}
unable_to_retrieve_package() {
echo "Unable to retrieve a valid package!"
if test "x$download_url" != "x"; then
echo "Download URL: $download_url"
fi
if test "x$stderr_results" != "x"; then
echo "\nDEBUG OUTPUT FOLLOWS:\n$stderr_results"
fi
exit 1
}
capture_tmp_stderr() {
# spool up /tmp/stderr from all the commands we called
if test -f "$tmp_dir/stderr"; then
output=`cat $tmp_dir/stderr`
stderr_results="${stderr_results}\nSTDERR from $1:\n\n$output\n"
rm $tmp_dir/stderr
fi
}
# do_wget URL FILENAME
do_wget() {
echo "trying wget..."
wget --user-agent="User-Agent: mixlib-install/3.12.1" -O "$2" "$1" 2>$tmp_dir/stderr
rc=$?
# check for 404
grep "ERROR 404" $tmp_dir/stderr 2>&1 >/dev/null
if test $? -eq 0; then
echo "ERROR 404"
fi
# check for bad return status or empty output
if test $rc -ne 0 || test ! -s "$2"; then
capture_tmp_stderr "wget"
return 1
fi
return 0
}
# do_curl URL FILENAME
do_curl() {
echo "trying curl..."
curl -A "User-Agent: mixlib-install/3.12.1" --retry 5 -sL -D $tmp_dir/stderr "$1" > "$2"
rc=$?
# check for 404
grep "404 Not Found" $tmp_dir/stderr 2>&1 >/dev/null
if test $? -eq 0; then
echo "ERROR 404"
fi
# check for bad return status or empty output
if test $rc -ne 0 || test ! -s "$2"; then
capture_tmp_stderr "curl"
return 1
fi
return 0
}
# do_fetch URL FILENAME
do_fetch() {
echo "trying fetch..."
fetch --user-agent="User-Agent: mixlib-install/3.12.1" -o "$2" "$1" 2>$tmp_dir/stderr
# check for bad return status
test $? -ne 0 && return 1
return 0
}
# do_perl URL FILENAME
do_perl() {
echo "trying perl..."
perl -e 'use LWP::Simple; getprint($ARGV[0]);' "$1" > "$2" 2>$tmp_dir/stderr
rc=$?
# check for 404
grep "404 Not Found" $tmp_dir/stderr 2>&1 >/dev/null
if test $? -eq 0; then
echo "ERROR 404"
fi
# check for bad return status or empty output
if test $rc -ne 0 || test ! -s "$2"; then
capture_tmp_stderr "perl"
return 1
fi
return 0
}
# do_python URL FILENAME
do_python() {
echo "trying python..."
python -c "import sys,urllib2; sys.stdout.write(urllib2.urlopen(urllib2.Request(sys.argv[1], headers={ 'User-Agent': 'mixlib-install/3.12.1' })).read())" "$1" > "$2" 2>$tmp_dir/stderr
rc=$?
# check for 404
grep "HTTP Error 404" $tmp_dir/stderr 2>&1 >/dev/null
if test $? -eq 0; then
echo "ERROR 404"
fi
# check for bad return status or empty output
if test $rc -ne 0 || test ! -s "$2"; then
capture_tmp_stderr "python"
return 1
fi
return 0
}
# do_download URL FILENAME
do_download() {
echo "downloading $1"
echo " to file $2"
url=`echo $1`
if test "x$platform" = "xsolaris2"; then
if test "x$platform_version" = "x5.9" -o "x$platform_version" = "x5.10"; then
# solaris 9 lacks openssl, solaris 10 lacks recent enough credentials - your base O/S is completely insecure, please upgrade
url=`echo $url | sed -e 's/https/http/'`
fi
fi
# we try all of these until we get success.
# perl, in particular may be present but LWP::Simple may not be installed
if exists wget; then
do_wget $url $2 && return 0
fi
if exists curl; then
do_curl $url $2 && return 0
fi
if exists fetch; then
do_fetch $url $2 && return 0
fi
if exists perl; then
do_perl $url $2 && return 0
fi
if exists python; then
do_python $url $2 && return 0
fi
unable_to_retrieve_package
}

20
shell/judge.sh

@ -0,0 +1,20 @@ @@ -0,0 +1,20 @@
is_int() { #? Check if value(s) is integer
local param
for param; do
if [[ ! $param =~ ^[\-]?[0-9]+$ ]]; then return 1; fi
done
}
is_float() { #? Check if value(s) is floating point
local param
for param; do
if [[ ! $param =~ ^[\-]?[0-9]*[,.][0-9]+$ ]]; then return 1; fi
done
}
is_hex() { #? Check if value(s) is hexadecimal
local param
for param; do
if [[ ! ${param//#/} =~ ^[0-9a-fA-F]*$ ]]; then return 1; fi
done
}

31
shell/proxy.sh

@ -0,0 +1,31 @@ @@ -0,0 +1,31 @@
if test "x$https_proxy" != "x"; then
echo "setting https_proxy: $https_proxy"
HTTPS_PROXY=$https_proxy
https_proxy=$https_proxy
export HTTPS_PROXY
export https_proxy
fi
if test "x$http_proxy" != "x"; then
echo "setting http_proxy: $http_proxy"
HTTP_PROXY=$http_proxy
http_proxy=$http_proxy
export HTTP_PROXY
export http_proxy
fi
if test "x$ftp_proxy" != "x"; then
echo "setting ftp_proxy: $ftp_proxy"
FTP_PROXY=$ftp_proxy
ftp_proxy=$ftp_proxy
export FTP_PROXY
export ftp_proxy
fi
if test "x$no_proxy" != "x"; then
echo "setting no_proxy: $no_proxy"
NO_PROXY=$no_proxy
no_proxy=$no_proxy
export NO_PROXY
export no_proxy
fi
Loading…
Cancel
Save