芯片设计环境通常有比较严格的安全管理,用户帐号过期后就不能登录环境,影响用户工作。为减少影响,应该提前向用户发送提醒,及时更新密码。
方案设计
通过IDM REST API接口获取所有帐号信息,根据以下条件过滤,筛选出即将过期帐号和已过期帐号。
-
帐号类型为正式员工(employee)和实习生(intern)
-
帐号没有在需要忽略的清单中
-
帐号过期日期相对于今天的间隔时间在指定间隔时间内
筛选出需要通知的帐号列表以后,遍历列表,如果帐号与公司人事系统不一致,使用公司人事系统帐号,最后记录日志并发送飞书提醒给用户。
示例代码:
#!/opt/miniconda3/bin/python
##########################################
## Author: shuguangbo
##########################################import os
import re
import datetime
import sys
import traceback
import time
import yaml
import logging
import logging.config
import re
import urllib3
import requests
import jsonimport ipahttpurllib3.disable_warnings()def parseConfig(cfname=None):appConf = Nonetry:if cfname is None or not os.path.exists(cfname) :cfname = os.path.join(os.environ['CONF_DIR'] if 'CONF_DIR' in os.environ else os.path.dirname(__file__), os.path.basename(__file__).split('.')[0] + ".yml")with open(cfname, 'r') as fd :appConf = yaml.load(fd, Loader=yaml.FullLoader)except Exception as e:logging.error("Read configuration failed! Error:{}".format(str(e)))sys.exit(1)reqParams = {'IDM':['IDM_URL', 'IDM_USER', 'IDM_PASS'], 'NOTICE': ['ADMINS', 'NOTICE_DATES', 'TMPDIR', 'TOKEN', 'LARKURL'],'LOGGERCONFIG':['formatters', 'handlers', 'root']}missingParams = {key: set(reqParams[key]) - set(appConf[key].keys()) for key in reqParams.keys() if len(set(reqParams[key]) - set(appConf[key].keys()))}if len(missingParams):logging.error('Missing parameters: {}'.format(missingParams))sys.exit(1)appConf['IDM']['IDM_PASS'] = appConf['IDM']['IDM_PASS']if len(appConf['NOTICE']['NOTICE_DATES']) < 1:logging.error('Empty notice dates!')sys.exit(4)if 'NOTIFY_ADMIN_ONLY' not in appConf['NOTICE']:appConf['NOTICE']['NOTIFY_ADMIN_ONLY'] = Falsereturn appConfclass accountExpiryChecker():def __init__(self, config):self._config = configself._idmClient = self._createIDMClient()self._expiryList = []def _createIDMClient(self):idm = Noneconnected = Falsetry:admin_receivers = ' '.join(self._config['NOTICE']['ADMINS'])user = self._config['IDM']['IDM_USER'] password = self._config['IDM']['IDM_PASS']urlList = self._config['IDM']['IDM_URL'] if isinstance(self._config['IDM']['IDM_URL'], list) else [self._config['IDM']['IDM_URL']]for url in urlList:try:idm = ipahttp.ipa(url)ret = idm.login(user, password)if ret.status_code != 200:logging.error("Login IDM %s failed, error:%s."%(url, ret.text))time.sleep(5)continueelse:connected = Truebreakexcept Exception as e:logging.error("Login IDM {} failed. Error: {}, Stack: {}".format(url, str(e), traceback.format_exc()))continueif connected:logging.info("Login IDM succeeded.")else:logging.error("Failed to loging IDM. Exit!")exit(2)except Exception as e:logging.error("Login IDM failed. Error: {}".format(str(e), traceback.format_exc()))exit(2)return idmdef _checkExpiry(self):ipa = self._idmClientusers = ipa.user_find()expiryList = []maxExpiryDay = max([abs(item) for item in self._config['NOTICE']['NOTICE_DATES']])logging.info('Total {} accounts. Max expiry notice day is {}'.format(len(users['result']['result']), maxExpiryDay))for user in users['result']['result']:name = user['uid'][0]cname = user['displayname'][0] if 'displayname' in user else '-'atype = user['employeetype'][0] if 'employeetype' in user else '-'ts = user['krbpasswordexpiration'][0] if 'krbpasswordexpiration' in user else '-'status = user['nsaccountlock']if cname == '-' or atype == '-' or ts == '-' or status:continueif atype not in ['employee', 'intern']:continueif name in self._config['NOTICE']['IGNORED_ACCOUNTS']:continueexpiry = datetime.datetime.strptime(ts[0:8], '%Y%m%d').date()delta = (expiry - datetime.date.today()).daysif delta > maxExpiryDay:continuedata = {'cname':cname, 'name':name, 'expiry':expiry.strftime('%Y-%m-%d'), 'delta': delta}expiryList.append(data)logging.debug('{}'.format(data))self._expiryList = expiryListdef _notifyExpiry(self):logging.info(f"Notify admin only {self._config['NOTICE']['NOTIFY_ADMIN_ONLY']}")gqtx = '用户 {} 帐号 {} 密码有效期至 {},将在 {} 天后过期。请及时更新密码。'gqtz = '用户 {} 帐号 {} 密码有效期至 {},已过期 {} 天。请及时更新密码。'gqdt = '用户 {} 帐号 {} 密码有效期至 {},在今天过期。请及时更新密码。'msg = ''for expiry in self._expiryList:delta = expiry['delta']if delta in self._config['NOTICE']['NOTICE_DATES']:user = self._config['NOTICE']['SPECIAL_ACCOUNTS'].get(expiry['name'])user = user if user else expiry['name']receiver = ' '.join(self._config['NOTICE']['ADMINS'])if not self._config['NOTICE']['NOTIFY_ADMIN_ONLY']:receiver += ' ' + userif expiry['delta'] > 0:msg = gqtx.format(expiry['cname'], expiry['name'], expiry['expiry'], expiry['delta'])reminder = f"将在 {expiry['delta']} 天后过期"elif expiry['delta'] == 0:msg = gqdt.format(expiry['cname'], expiry['name'], expiry['expiry'])reminder = "在今天过期"else:msg = gqtz.format(expiry['cname'], expiry['name'], expiry['expiry'], abs(expiry['delta']))reminder = f"已过期 {abs(expiry['delta'])} 天"logger.info(msg)data = {'user':expiry['cname'], 'account':expiry['name'], 'expiry':expiry['expiry'], 'reminder':reminder}if len(receiver):self._sendLark(data, receiver)def _sendLark(self, message, receivers):try:session = requests.Session()larkURL = self._config['NOTICE']['LARKURL']header = {'Content-Type':'application/json', 'Authorization':self._config['NOTICE']['TOKEN']}data = {}data['trigger_key'] = '帐号密码过期提醒'data['instance'] = messagedata['notice'] = receivers if type(receivers) == list else receivers.split()result = session.post(larkURL, headers=header, data=json.dumps(data), verify=False)if result.status_code == 200:logging.info('Send message succeeded.')else:logging.error(f"Send message failed. Error: {result.text}")except Exception as e:logging.error(f"Send message failed. Error: {str(e)}, Stack: {traceback.format_exc()}")def run(self):self._checkExpiry()self._notifyExpiry()if __name__ == "__main__":config = parseConfig()logging.config.dictConfig(config['LOGGERCONFIG'])logger = logging.getLogger(os.path.basename(__file__))exipryChecker = accountExpiryChecker(config)exipryChecker.run()
通过设置配置文件中的参数 NOTICE_DATES 控制发送通知的频率,下面的配置为提前15、7、4、2、1天和当天,以及过期后1、2、4、7、14、21天后发送通知。
NOTICE_DATES: [15, 7, 4, 2, 1, 0, -1, -2, -4, -7, -14, -21, -28, -35]
可能通过 crontab 或 Jenkins 每天运行一次即可。