一、多线程使用
1.使用threading库
# 使用threading进行调用多线程
from threading import Thread
2.使用
(1)使用函数方法
thread = Thread(target=方法对象, #不要括号args=('参数1', '参数2') #一个参数也需要加上逗号
)
# 创建一个线程对象
thread.start()
# 执行线程对象
(2)使用类方法
import threading
import timeclass TestThread(threading.Thread):def __init__(self, n):super(TestThread, self).__init__()self.n = ndef run(self):print('TestThread类的类方法方式创建多线程', self.n)time.sleep(0.5)t1=TestThread("参数1")
t2=TestThread("参数2")
3.小demo
from threading import Thread
from time import sleepdef work(name):for i in range(5):print(name)sleep(0.5)t1 = Thread(target=work,args=('名字A',)
)
t2 = Thread(target=work,args=('名字B',)
)
t1.start()
t2.start()
print('---结束---')
# 执行结果
"""
名字A
名字B
---结束---
名字A名字B
名字A名字B
名字B名字A
名字A名字B
Process finished with exit code 0两个子线程开始后,主线程从上到下运行结束,但两个子线程t1/t2还在运行
"""
4.等待执行
从demo中可以发现,主线程结束了两个子线程还没结束,使用join方法可以让主线程等待子线程执行完成后结束。
#使用方法join,其目的是加入主线程等待
from threading import Thread
from time import sleepdef work(name):for i in range(5):print(name)sleep(0.5)t1 = Thread(target=work,args=('名字A',)
)
t2 = Thread(target=work,args=('名字B',)
)
t1.start()
t2.start()
t1.join()
t2.join()print('---结束---')
"""
名字A
名字B
名字A
名字B
名字B
名字A
名字A
名字B
名字B
名字A
---结束---
Process finished with exit code 0将t1和t2加入主线程,主线程在等待t1/t2运行结束后,才结束运行
"""
二、多线程压测构思
以最常使用压测的场景:秒杀活动订单 为例子
1.生成随机且未注册手机号并登录
-> 存数据为{“手机号”:“登录token”},多个使用列表遍历
2.访问商品列表
-> 读取秒杀商品,有多个选择第一个
3.秒杀商品提交订单
-> 断言结果为success不考虑其他操作和环境限制
三、实现
以秒杀商品订单为例,创建"skilOrder"包,py文件以"sk_"开头
1.工具类
# 工具类
ToolMySQL -> 输入sql查询库并返回结果
ToolFakerDate -> 生成一个库里未使用过的手机号
2.sk_variable.py
"""
存储公共使用变量
"""class Variable:pass
3.sk_setting.py
# 存放各类url
url1 = ''
url2 = ''
url3 = ''# 存放请求头
header = {'Content-Type': 'application/json','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36','xxx':'xxx',...
}# 存放skillOrder中需要的前置条件/变量
4.sk_register.py
import requests
from jsonpath import jsonpathfrom seckillorder.sk_setting import 注册_url, head
from seckillorder.sk_variable import Variable
from 工具类 import ToolFakerDateclass RegisterLogin:# 注册并登录,存储登录信息[{“手机号”:“登录token”},...]# 设置成Variable类属性def __init__(self,times):self.times = timesself.fakerphone = ToolFakerDate()def fakerphonelist(self):register_list = []num = 0while num < self.times:c = self.fakerphone.get_new_phone()if c not in register_list:register_list.append(c)num += 1log.info("生成的手机号列表:", register_list)return register_listdef reslogin(self):log.info("-------------reslogin_start---------------")register_list = self.fakerphonelist()login_info = {}for i in register_list:data = {"xxx":"xxx",.......# 根据注册接口所需参数填写}resq = requests.post(url=tes_url, headers=header, json=data)if jsonpath(resq.json(),'msg')[0] == 'success':print(i, resq.text)login_info[f'{i}'] = jsonpath(resq.json(),'token')[0]setattr(Variable, 'login_info', login_info)log.info('login_info', login_info)log.info("-------------reslogin_end---------------")
5.sk_product.py
import json
from threading import Threadimport requests
from jsonpath import jsonpathfrom seckillorder.sk_setting import 商品列表_url, head, 下单_url
from seckillorder.sk_variable import Variableclass GetProduce:# 获取商品列表信息,查找对应秒杀商品并暂存# 设置成Variable类属性def getproduceinfo(self):resq = requests.get(url=商品列表_url, headers=head)for i in resq返回的内容:# 从响应体中去获取提交秒杀商品订单需要的参数def orderinfo(token):data = [{"xxx":"xxx",# 提交订单所需要的参数}]header['token'] = tokenresq = requests.post(url=下单_url, headers=head, json=data)log.info(token, resq.text)log.info(header)class SumbimOrder:def SubmitOrder(self, method, token):theadlist = []for k, v in token.items():log.info(k, v)thread = Thread(target=method,args=(v,))theadlist.append(thread)thread.start()for thread in theadlist:thread.join()
四、写在最后
写在最后:
感觉不像是想象中的多线程压测脚本
在创建完线程时候就已经完成下单了,并发下单才有压测的感觉
断言只需要查库判断对应订单号是否创建和订单支付状态即可
写着玩,图个乐
欢迎大佬指点迷津!!!