mirror of https://github.com/lework/script
lework
5 years ago
1 changed files with 168 additions and 0 deletions
@ -0,0 +1,168 @@
@@ -0,0 +1,168 @@
|
||||
#!/usr/bin/python |
||||
# -*- coding: utf-8 -*- |
||||
|
||||
# @Time : 2020-01-15 |
||||
# @Author : lework |
||||
# @Desc : 使用Pam-Python实现SSH的企业微信双因素认证 |
||||
|
||||
import sys |
||||
import pwd |
||||
import json |
||||
import string |
||||
import syslog |
||||
import random |
||||
import hashlib |
||||
import httplib |
||||
import datetime |
||||
import platform |
||||
|
||||
|
||||
def auth_log(msg): |
||||
"""写入日志""" |
||||
syslog.openlog(facility=syslog.LOG_AUTH) |
||||
syslog.syslog("MultiFactors Authentication: " + msg) |
||||
syslog.closelog() |
||||
|
||||
|
||||
def action_wechat(content, touser=None, toparty=None, totag=None): |
||||
"""微信通知""" |
||||
host = "qyapi.weixin.qq.com" |
||||
|
||||
# 企业微信设置 |
||||
corpid = "" |
||||
secret = "" |
||||
agentid = "" |
||||
|
||||
headers = { |
||||
'Content-Type': 'application/json' |
||||
} |
||||
|
||||
access_token_url = '/cgi-bin/gettoken?corpid={id}&corpsecret={crt}'.format(id=corpid, crt=secret) |
||||
|
||||
try: |
||||
httpClient = httplib.HTTPSConnection(host, timeout=10) |
||||
httpClient.request("GET", access_token_url, headers=headers) |
||||
response = httpClient.getresponse() |
||||
token = json.loads(response.read())['access_token'] |
||||
httpClient.close() |
||||
except Exception as e: |
||||
auth_log('get wechat token error: %s' % e) |
||||
return False |
||||
|
||||
send_url = '/cgi-bin/message/send?access_token={token}'.format(token=token) |
||||
|
||||
data = { |
||||
"msgtype": 'text', |
||||
"agentid": agentid, |
||||
"text": {'content': content}, |
||||
"safe": 0 |
||||
} |
||||
|
||||
if touser: |
||||
data['touser'] = touser |
||||
if toparty: |
||||
data['toparty'] = toparty |
||||
if toparty: |
||||
data['totag'] = totag |
||||
|
||||
try: |
||||
httpClient = httplib.HTTPSConnection(host, timeout=10) |
||||
httpClient.request("POST", send_url, json.dumps(data), headers=headers) |
||||
response = httpClient.getresponse() |
||||
result = json.loads(response.read()) |
||||
if result['errcode'] != 0: |
||||
auth_log('Failed to send verification code using WeChat: %s' % result) |
||||
return False |
||||
except Exception as e: |
||||
auth_log('Error sending verification code using WeChat: %s' % e) |
||||
return False |
||||
finally: |
||||
if httpClient: |
||||
httpClient.close() |
||||
|
||||
auth_log('Send verification code using WeChat successfully.') |
||||
return True |
||||
|
||||
def get_user_comment(user): |
||||
"""获取用户描述信息""" |
||||
try: |
||||
comments = pwd.getpwnam(user).pw_gecos |
||||
except: |
||||
auth_log("No local user (%s) found." % user) |
||||
return -1 |
||||
|
||||
return comments # 返回用户描述信息 |
||||
|
||||
|
||||
def get_hash(plain_text): |
||||
"""获取PIN码的sha512字符串""" |
||||
key_hash = hashlib.sha512() |
||||
key_hash.update(plain_text) |
||||
|
||||
return key_hash.digest() |
||||
|
||||
|
||||
def gen_key(pamh, user, length): |
||||
"""生成PIN码并发送到用户""" |
||||
pin = ''.join(random.choice(string.digits) for i in range(length)) |
||||
#msg = pamh.Message(pamh.PAM_ERROR_MSG, "The pin is: (%s)" % (pin)) # 登陆界面输出验证码,测试目的,实际使用中注释掉即可 |
||||
#pamh.conversation(msg) |
||||
|
||||
hostname = platform.node().split('.')[0] |
||||
content = "[MFA] %s 使用 %s 正在登录 %s, 验证码为【%s】, 1分钟内有效。" % (pamh.rhost, user, hostname, pin) |
||||
touser = get_user_comment(user) |
||||
result = action_wechat(content, touser=touser) |
||||
|
||||
pin_time = datetime.datetime.now() |
||||
return get_hash(pin), pin_time |
||||
|
||||
|
||||
def pam_sm_authenticate(pamh, flags, argv): |
||||
PIN_LENGTH = 6 # PIN码长度 |
||||
PIN_LIVE = 60 # PIN存活时间,超出时间验证失败 |
||||
|
||||
try: |
||||
user = pamh.get_user() |
||||
except pamh.exception as e: |
||||
return e.pam_result |
||||
|
||||
auth_log("login_ip: %s, login_user: %s" % (pamh.rhost, user)) |
||||
|
||||
pin, pin_time = gen_key(pamh, user, PIN_LENGTH) |
||||
|
||||
for attempt in range(0, 3): # 仅允许三次错误尝试 |
||||
msg = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Verification code:") |
||||
resp = pamh.conversation(msg) |
||||
resp_time = datetime.datetime.now() |
||||
input_interval = resp_time - pin_time |
||||
if input_interval.seconds > PIN_LIVE: |
||||
msg = pamh.Message(pamh.PAM_ERROR_MSG, "Time limit exceeded.") |
||||
pamh.conversation(msg) |
||||
return pamh.PAM_ABORT |
||||
if get_hash(resp.resp) == pin: # 用户输入与生成的验证码进行校验 |
||||
return pamh.PAM_SUCCESS |
||||
else: |
||||
continue |
||||
|
||||
msg = pamh.Message(pamh.PAM_ERROR_MSG, "Too many authentication failures.") |
||||
pamh.conversation(msg) |
||||
return pamh.PAM_AUTH_ERR |
||||
|
||||
|
||||
def pam_sm_setcred(pamh, flags, argv): |
||||
return pamh.PAM_SUCCESS |
||||
|
||||
|
||||
def pam_sm_acct_mgmt(pamh, flags, argv): |
||||
return pamh.PAM_SUCCESS |
||||
|
||||
|
||||
def pam_sm_open_session(pamh, flags, argv): |
||||
return pamh.PAM_SUCCESS |
||||
|
||||
|
||||
def pam_sm_close_session(pamh, flags, argv): |
||||
return pamh.PAM_SUCCESS |
||||
|
||||
def pam_sm_chauthtok(pamh, flags, argv): |
||||
return pamh.PAM_SUCCESS |
Loading…
Reference in new issue