本篇介绍httprunner中hook函数的使用,以及通过编程能力实现建设自动化测试更全面的场景覆盖
前置:
互联网时代让我们更快的学习到什么是Httprunner
正文:
经过上文了解到这个框架怎么使用之后,我们开始来探讨一下我们为什么要用hook函数?用hook函数能帮我们实现什么?能给我们带来什么收益?
举个例子:假设我们的业务系统目前是通过Kafka来实现消息的交互,A系统生产的消息B系统需要去消费处理后在生产消息给到下游系统
那么如果说我们需要为B系统编写这类流程的自动化,我们需要怎么实现呢?
一)设计
首先考虑一个问题:如果A系统是独立系统,如果我们写的自动化要依赖A系统的数据那就耦合度太高了。如果我们只考虑B系统的完整性,那么我们完全可以把A mock掉,将A生产的报文消息当作cases,通过python脚本来实现。
然后B系统消费后会操作数据库,那么我们断言是否可以也读数据库去判断是否预期
其次B会发消息来通知下游系统消费,那么我们是不是也可以写一个kafka监听消费消息,判断报文是否符合预期
二)实现
1、先写一个发送消息的函数,这个地方加一个参数是为了写多个cases
import json
from kafka import KafkaProducerdef sengMessage(tag):producer = KafkaProducer(bootstrap_servers='这个地方写集群名称')if('A' == tag):msg_dict = {"CMD": "这个地方是CMD","Data": "这个地方是消息体","Key": "这个是Key","LogID": "681533193628954414","Tag": "这个是标识","TimeStamp": "这是个时间","Topic": "这是个要发送的主题"}msg = json.dumps(msg_dict).encode()#partition这个地方写的是0表示放0这个Partition上了,如果要随机就看你有几个Partition,random一下producer.send("这个地方放主题topic", msg, partition=0)producer.close()
2、然后将这个函数放到debugtalk.py文件里面
3、编写一个cases,将此函数引用到setup_hooks
使用方式就是${函数名称(参数)},说明一下就是为啥要在setup_hooks里面使用,是因为需要在执行cases之前将消息发送之后系统消费到了才能走后面的流程(就是前置)
然后这个前置操作可以做:消息的发送、数据的处理(比如删除数据,更新数据等等)
4、编写一个操作数据库的函数
import pymysql
import yaml
from loguru import logger
from lib import utilsclass MySQL(object):"""Mysql数据库操作"""db = {}def __init__(self, db):"""初始化数据库对象Args:db (str): 数据库名字,代表了db配置的某个文件,数据库连接信息也在对应文件配置"""file = f"{utils.get_root_path()}/conf/{db}.yml"with open(file, encoding="utf-8") as conf:self.db = yaml.load(conf, Loader=yaml.FullLoader)def execute(self, sql):"""在mysql上执行一个sql语句,返回的结果为多条记录的元组,每条记录为dict,内容为字段-值字典Returns:[tuple]: 执行结果"""self.db['cursorclass'] = pymysql.cursors.DictCursortry:connection = pymysql.connect(**self.db)logger.info(f"execute sql: [{sql}]")with connection.cursor() as cursor:cursor.execute(sql)data = cursor.fetchall()logger.debug(f"execute ret: [{data}]")connection.commit()finally:connection.close()return data
#怎么使用呢,就是直接用这个函数传配置文件名称,sql
if __name__ == '__main__':MySQL("auto").execute("select * from A limit 1")
设置放置存储数据库配置文件的目录
数据库配置
host: IP
port: 端口
user: 用户名
passwd: 密码
db: 数据库
5、能操作数据库之后,那么我们是否可以将查询回来的全部重要字段(什么自增主键ID、时间啥的就不用断言了)进行判断了,写一个比较json的函数
class Check(object):def CheckSql(prama):db = 'auto'errmsgl_success = 'success'sql = 'SELECT A,B,C,D,E,F,G,H,I,J,K from AAA where aa=' + '\'' + str(prama) + '\''sql_count = 'SELECT count(1) from AAA WHERE (aa = ' + "\'" + str(prama) + "\'" + ')'# 先查询条数,没有就重试,如果重试还没有就失败count = MySQL(db).execute(sql_count)[0]['count(1)']retry = 0while count == 0 and retry < 15:time.sleep(3)retry = retry + 1count = MySQL(db).execute(sql_count)[0]['count(1)']aa = MySQL(db).execute(sql)[0]response = json.dumps(aa)print(response)# 这个地方,假如A字段是一个JSON,这个地方是做了一个处理,让这个json排序,就不会出现每次查询这个json都是乱序得导致断言失败# 假设A字段:[{fee:1},{fee:2},{fee:3}],那么我们就用fee来排序A = eval(str(aa["A"]))aa["A"] = sorted(A, key=lambda x: (x['fee']), reverse=True)print(json.dumps(aa))# 这个地方用的直接是传进来得参数作为文件名称checkfile__detail = str(prama) detail_ret = Check.checkfile_json(aa, checkfile__detail)if detail_ret != errmsgl_success:return '文件名称:' + checkfile__detail + ', 失败原因: ' + detail_retreturn errmsgl_success# 读取对应的预期文件,这个函数可以抽出来,当一个公共方法def checkfile_json(requestdata, checkfile):# 文件地址当前项目/checkfile/checkfile.jsonbasic_url = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/checkfile'if checkfile:with open(basic_url + checkfile + '.json', 'r') as f:setdatas = json.load(f)return Check.cmp(requestdata, setdatas)def cmp(src_data, dst_data):msg = 'success'if isinstance(dst_data, dict):"""若为dict格式"""for key in dst_data:if key not in src_data:return 'key: ' + str(key) + ' 在请求中不存在'msg = Check.cmp(src_data[key], dst_data[key])if msg != 'success':return msgelif isinstance(dst_data, list):"""若为list格式"""for src_list, dst_list in zip((src_data), (dst_data)):"""递归"""msg = Check.cmp(src_list, dst_list)if msg != 'success':return msgelse:if str(src_data) != str(dst_data):return 'src_data != dst_data, src_data: ' + str(src_data) + ', dst_data: ' + str(dst_data)return msg
6、断言函数既然已经写好了,那就直接引用,同上面那个流程,加载到debugtaik.py文件之后,我们需要在cases断言处加上此函数,还是用${函数名称(参数)}来引用
7、最后编写一下消费kafka消息进行断言的函数
def check_kafka_messgae(topic, bootstrap_servers, checkfile, enable_auto_commit=False, consumer_timeout_ms=300):# 参数分别是主题,集群,断言文件地址,自动提交,超时时间consumer = KafkaConsumer(bootstrap_servers=[bootstrap_servers], enable_auto_commit=enable_auto_commit,consumer_timeout_ms=consumer_timeout_ms)# 这个地方只监听一个partition == 0的topic_partition = TopicPartition(topic=topic, partition=0)lastoffset = consumer.end_offsets([topic_partition])[topic_partition]consumer.assign([topic_partition])consumer.seek(topic_partition, lastoffset - 1)messagelist = ''for message in consumer:messagelist = message.valuemessagelist = json.loads(json.loads(str(messagelist, 'utf-8'))['Data'])print('topic数据' + str(messagelist))# checkfile_json这个方法上面写得函数里面有,直接调用return checkfile_json(messagelist, checkfile)
8、同上,还是将这个函数加入到debugtaik.py,在validate里面进行断言判断
总结:
将上面的流程串起来之后发现,从kafak消息的发送、数据库的查询、文件读取/数据判断,到kafka消息监听断言整个流程能够完成上述的诉求。
通过一个例子来解读hook函数的作用。在真实的业务场景下,我们在编写自动化cases的时候完全可以用纯代码的方式来实现我的复杂的业务场景。通过这种方式能更有效的认可自动化去替代人工操作。
后话:
个人认为自动化的灵魂不是用什么工具,而是断言。不管是人工还是程序自动操作都需要有一个判断依据,如果这个判断依据十分有效,那么我们只需要完善场景cases就能实现自动化替换人工(百分之80吧)