#!/usr/bin/python # -*- coding: utf-8 -*- # @Time : 2020-01-15 # @Author : lework # @Desc : 使用Pam-Python实现SSH的企业微信双因素认证 import sys import pwd import json import string import syslog import random import hashlib import httplib import datetime import platform def auth_log(msg): """写入日志""" syslog.openlog(facility=syslog.LOG_AUTH) syslog.syslog("MultiFactors Authentication: " + msg) syslog.closelog() def action_wechat(content, touser=None, toparty=None, totag=None): """微信通知""" host = "qyapi.weixin.qq.com" # 企业微信设置 corpid = "" secret = "" agentid = "" headers = { 'Content-Type': 'application/json' } access_token_url = '/cgi-bin/gettoken?corpid={id}&corpsecret={crt}'.format(id=corpid, crt=secret) try: httpClient = httplib.HTTPSConnection(host, timeout=10) httpClient.request("GET", access_token_url, headers=headers) response = httpClient.getresponse() token = json.loads(response.read())['access_token'] httpClient.close() except Exception as e: auth_log('get wechat token error: %s' % e) return False send_url = '/cgi-bin/message/send?access_token={token}'.format(token=token) data = { "msgtype": 'text', "agentid": agentid, "text": {'content': content}, "safe": 0 } if touser: data['touser'] = touser if toparty: data['toparty'] = toparty if toparty: data['totag'] = totag try: httpClient = httplib.HTTPSConnection(host, timeout=10) httpClient.request("POST", send_url, json.dumps(data), headers=headers) response = httpClient.getresponse() result = json.loads(response.read()) if result['errcode'] != 0: auth_log('Failed to send verification code using WeChat: %s' % result) return False except Exception as e: auth_log('Error sending verification code using WeChat: %s' % e) return False finally: if httpClient: httpClient.close() auth_log('Send verification code using WeChat successfully.') return True def get_user_comment(user): """获取用户描述信息""" try: comments = pwd.getpwnam(user).pw_gecos except: auth_log("No local user (%s) found." % user) comments = '' return comments # 返回用户描述信息 def get_hash(plain_text): """获取PIN码的sha512字符串""" key_hash = hashlib.sha512() key_hash.update(plain_text) return key_hash.digest() def gen_key(pamh, user, length): """生成PIN码并发送到用户""" pin = ''.join(random.choice(string.digits) for i in range(length)) #msg = pamh.Message(pamh.PAM_ERROR_MSG, "The pin is: (%s)" % (pin)) # 登陆界面输出验证码,测试目的,实际使用中注释掉即可 #pamh.conversation(msg) hostname = platform.node().split('.')[0] content = "[MFA] %s 使用 %s 正在登录 %s, 验证码为【%s】, 1分钟内有效。" % (pamh.rhost, user, hostname, pin) touser = get_user_comment(user) result = action_wechat(content, touser=touser) pin_time = datetime.datetime.now() return get_hash(pin), pin_time def pam_sm_authenticate(pamh, flags, argv): PIN_LENGTH = 6 # PIN码长度 PIN_LIVE = 60 # PIN存活时间,超出时间验证失败 PIN_LIMIT = 3 # 限制错误尝试次数 EMERGENCY_HASH = '\xba2S\x87j\xedk\xc2-Jo\xf5=\x84\x06\xc6\xad\x86A\x95\xed\x14J\xb5\xc8v!\xb6\xc23\xb5H\xba\xea\xe6\x95m\xf3F\xec\x8c\x17\xf5\xea\x10\xf3^\xe3\xcb\xc5\x14y~\xd7\xdd\xd3\x14Td\xe2\xa0\xba\xb4\x13' # 预定义验证码123456的hash try: user = pamh.get_user() except pamh.exception as e: return e.pam_result auth_log("login_ip: %s, login_user: %s" % (pamh.rhost, user)) if get_user_comment(user) == '': msg = pamh.Message(pamh.PAM_ERROR_MSG, "[Warning] You need to set the Qiyi WeChat username in the comment block for user %s." % (user)) pamh.conversation(msg) return pamh.PAM_ABORT pin, pin_time = gen_key(pamh, user, PIN_LENGTH) for attempt in range(0, PIN_LIMIT): # 限制错误尝试次数 msg = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Verification code:") resp = pamh.conversation(msg) resp_time = datetime.datetime.now() input_interval = resp_time - pin_time if input_interval.seconds > PIN_LIVE: msg = pamh.Message(pamh.PAM_ERROR_MSG, "[Warning] Time limit exceeded.") pamh.conversation(msg) return pamh.PAM_ABORT resp_hash = get_hash(resp.resp) if resp_hash == pin or resp_hash == EMERGENCY_HASH: # 用户输入与生成的验证码进行校验 return pamh.PAM_SUCCESS else: continue msg = pamh.Message(pamh.PAM_ERROR_MSG, "[Warning] Too many authentication failures.") pamh.conversation(msg) return pamh.PAM_AUTH_ERR def pam_sm_setcred(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_acct_mgmt(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_open_session(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_close_session(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_chauthtok(pamh, flags, argv): return pamh.PAM_SUCCESS