diff --git a/python/pam_wechat_auth.py b/python/pam_wechat_auth.py new file mode 100644 index 0000000..f211f56 --- /dev/null +++ b/python/pam_wechat_auth.py @@ -0,0 +1,168 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# @Time : 2020-01-15 +# @Author : lework +# @Desc : 使用Pam-Python实现SSH的企业微信双因素认证 + +import sys +import pwd +import json +import string +import syslog +import random +import hashlib +import httplib +import datetime +import platform + + +def auth_log(msg): + """写入日志""" + syslog.openlog(facility=syslog.LOG_AUTH) + syslog.syslog("MultiFactors Authentication: " + msg) + syslog.closelog() + + +def action_wechat(content, touser=None, toparty=None, totag=None): + """微信通知""" + host = "qyapi.weixin.qq.com" + + # 企业微信设置 + corpid = "" + secret = "" + agentid = "" + + headers = { + 'Content-Type': 'application/json' + } + + access_token_url = '/cgi-bin/gettoken?corpid={id}&corpsecret={crt}'.format(id=corpid, crt=secret) + + try: + httpClient = httplib.HTTPSConnection(host, timeout=10) + httpClient.request("GET", access_token_url, headers=headers) + response = httpClient.getresponse() + token = json.loads(response.read())['access_token'] + httpClient.close() + except Exception as e: + auth_log('get wechat token error: %s' % e) + return False + + send_url = '/cgi-bin/message/send?access_token={token}'.format(token=token) + + data = { + "msgtype": 'text', + "agentid": agentid, + "text": {'content': content}, + "safe": 0 + } + + if touser: + data['touser'] = touser + if toparty: + data['toparty'] = toparty + if toparty: + data['totag'] = totag + + try: + httpClient = httplib.HTTPSConnection(host, timeout=10) + httpClient.request("POST", send_url, json.dumps(data), headers=headers) + response = httpClient.getresponse() + result = json.loads(response.read()) + if result['errcode'] != 0: + auth_log('Failed to send verification code using WeChat: %s' % result) + return False + except Exception as e: + auth_log('Error sending verification code using WeChat: %s' % e) + return False + finally: + if httpClient: + httpClient.close() + + auth_log('Send verification code using WeChat successfully.') + return True + +def get_user_comment(user): + """获取用户描述信息""" + try: + comments = pwd.getpwnam(user).pw_gecos + except: + auth_log("No local user (%s) found." % user) + return -1 + + return comments # 返回用户描述信息 + + +def get_hash(plain_text): + """获取PIN码的sha512字符串""" + key_hash = hashlib.sha512() + key_hash.update(plain_text) + + return key_hash.digest() + + +def gen_key(pamh, user, length): + """生成PIN码并发送到用户""" + pin = ''.join(random.choice(string.digits) for i in range(length)) + #msg = pamh.Message(pamh.PAM_ERROR_MSG, "The pin is: (%s)" % (pin)) # 登陆界面输出验证码,测试目的,实际使用中注释掉即可 + #pamh.conversation(msg) + + hostname = platform.node().split('.')[0] + content = "[MFA] %s 使用 %s 正在登录 %s, 验证码为【%s】, 1分钟内有效。" % (pamh.rhost, user, hostname, pin) + touser = get_user_comment(user) + result = action_wechat(content, touser=touser) + + pin_time = datetime.datetime.now() + return get_hash(pin), pin_time + + +def pam_sm_authenticate(pamh, flags, argv): + PIN_LENGTH = 6 # PIN码长度 + PIN_LIVE = 60 # PIN存活时间,超出时间验证失败 + + try: + user = pamh.get_user() + except pamh.exception as e: + return e.pam_result + + auth_log("login_ip: %s, login_user: %s" % (pamh.rhost, user)) + + pin, pin_time = gen_key(pamh, user, PIN_LENGTH) + + for attempt in range(0, 3): # 仅允许三次错误尝试 + msg = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Verification code:") + resp = pamh.conversation(msg) + resp_time = datetime.datetime.now() + input_interval = resp_time - pin_time + if input_interval.seconds > PIN_LIVE: + msg = pamh.Message(pamh.PAM_ERROR_MSG, "Time limit exceeded.") + pamh.conversation(msg) + return pamh.PAM_ABORT + if get_hash(resp.resp) == pin: # 用户输入与生成的验证码进行校验 + return pamh.PAM_SUCCESS + else: + continue + + msg = pamh.Message(pamh.PAM_ERROR_MSG, "Too many authentication failures.") + pamh.conversation(msg) + return pamh.PAM_AUTH_ERR + + +def pam_sm_setcred(pamh, flags, argv): + return pamh.PAM_SUCCESS + + +def pam_sm_acct_mgmt(pamh, flags, argv): + return pamh.PAM_SUCCESS + + +def pam_sm_open_session(pamh, flags, argv): + return pamh.PAM_SUCCESS + + +def pam_sm_close_session(pamh, flags, argv): + return pamh.PAM_SUCCESS + +def pam_sm_chauthtok(pamh, flags, argv): + return pamh.PAM_SUCCESS