场景
业务上有许多发送邮件的场景,发送的邮件基本上都是自动发送的,而且邮件内容是很重要的,对于邮件发没发送,发送的时间点对不对每次回归测试工作量太大了,所以考虑把这部分内容加入到自动化测试中
工具
-
python
-
gmail
要访问gmail先要去console.cloud.google.com创建访问凭证,如图所示,进行下载后获取一个json文件credentials.json
实现代码
其中DateFormat 和get_path是自定义方法,分别是获取时间和获取文件
from __future__ import print_function
import base64
import os.path
import logging
import timefrom utils.get_file_path import get_path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from utils.date_format import DateFormat
from datetime import datetime
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify']class GmailSearch:def __init__(self):"""get token.json."""self.creds = None# The file token.json stores the user's access and refresh tokens, and is# created automatically when the authorization flow completes for the first# time.file_token = get_path('token.json', 'utils')if os.path.exists(file_token):self.creds = Credentials.from_authorized_user_file(file_token, SCOPES)# If there are no (valid) credentials available, let the user log in.if not self.creds or not self.creds.valid:if self.creds and self.creds.expired and self.creds.refresh_token:self.creds.refresh(Request())else:flow = InstalledAppFlow.from_client_secrets_file(get_path('credentials.json', 'utils'), SCOPES)self.creds = flow.run_local_server(port=0)# Save the credentials for the next runwith open(file_token, 'w') as token:token.write(self.creds.to_json())def search_email(self, **kwargs):'''search email by keywords ( from, sender , subject, before, after, unread, interval):param kwargs: 默认查询当前时间前后10分钟未读的邮件, 如果第一次未查询到,默认会间隔10s钟查询一次,共查询三次, 如果找到的话,会将找到的标记成已读查询主题参数: subject发送人参数: sender是否已读 : unread (bool类型)查询时间段参数: before, after设置查询间隔: interval设置查询次数: count:return: the email, if no result is found, return none'''count = kwargs.get("count") if kwargs.get("count") is not None else 3if count == 0:return "no email found"search_str = ["in:inbox"]unread = False if kwargs.get("unread") else Trueif unread:search_str.append("is:unread")if kwargs.get("attachment"):search_str.append("has:attachment")if kwargs.get("sender"): # value like atest@email.com, btest@email.comsearch_str.append(f'from:({kwargs.get("sender")})')if kwargs.get("to"): # value like atest@email.com, btest@email.comsearch_str.append(f'to:({kwargs.get("to")})')if kwargs.get("subject"): search_str.append(f'subject:({kwargs.get("subject")})')if kwargs.get("before"):search_str.append(f'before:{kwargs.get("before")}')else: # default value is current + 10.minutessearch_str.append(f'before:{DateFormat.from_current_minutes(10)}')if kwargs.get("after"):search_str.append(f'after:{kwargs.get("after")}')else: # default value is current - 10.minutessearch_str.append(f'after:{DateFormat.from_current_minutes(-10)}')interval = kwargs.get("interval") if kwargs.get("interval") else 10query = " ".join(search_str)unread = kwargs.get("unread")time.sleep(interval)logging.info(datetime.now().strftime("%H:%M:%S"))logging.info(str(count - 1))try:# Call the Gmail APIservice = build('gmail', 'v1', credentials=self.creds)results = service.users().messages().list(userId='me', q=query).execute()messages = results.get("messages")if not messages:logging.info('No email found, continue to search')kwargs.update({"count": count-1})return self.search_email_content(**kwargs)# get the last email, mark read, return email body# body = []last_email = service.users().messages().get(userId='me', id=messages[0]['id']).execute()payload = last_email['payload']# internalDate = last_email['internalDate']# The Body of the message is in Encrypted format. So, we have to decode it.# Get the data and decode it with base 64 decoder.parts = payload.get('parts')[0]['body'] if payload.get('parts') else payload.get('body')data = parts['data']data = data.replace("-", "+").replace("_", "/")decoded_data = base64.b64decode(data)body = str(decoded_data, 'utf-8')# mark readif unread:logging.info(" mark read");service.users().messages().modify(userId='me', id=messages[0]['id'],body={'removeLabelIds': ['UNREAD']}).execute()return bodyexcept HttpError as error:logging.info(f'An error occurred: {error}')# test=GmailSearch()
# test.search_email(before='2023/10/3', after='2023/9/25', to="test@test.com", unread=True)
参考文档:Gmail api文档