如何采用遗传算法搜索MAC效率最高的矩阵乘规模
- 具体实现
- MAC效率评估代码(eval.py)
- 遗传算法实现
本文介绍了采用遗传算法搜索MAC效率最高的矩阵乘规模
需求背景:
- 一些AI加速卡在做矩阵乘时,因硬件或软件的约束,并不是规模越大MAC效率越高
- 在测试AI加卡的实际算力时,采用MAC效率最高的M,K,N会显得比较好看.能接近宣称的理论性能
- 因此,可以将该问题变成一个优化的问题。M,K,N是变量,目标是最大化实测FLOPS
- 由于某些规模的矩阵乘,加速卡不支持或有BUG,并不是所有的的优化算法都适用。经测试发现,遗传算法适合该场景
具体实现
MAC效率评估代码(eval.py)
# eval.pyimport os
import sys
import torch
import time
import numpy as np
from calflops import calculate_flopsclass MatMulModel(torch.nn.Module):def __init__(self,M,N,K):super(MatMulModel, self).__init__()self.b=torch.nn.Parameter(torch.randn(K, N))def forward(self,x):return torch.matmul(x, self.b)def evaluate(M,N,K):model=MatMulModel(M,N,K).eval()input_tensor = torch.randn(M,K)FLOPS=M*N*K flops, macs, params = calculate_flops(model, input_shape=(M,K))print(flops)def build_and_warmup(model,M,N,K):#省略return engine def forward(engine,input,count):for i in range(count):engine.run(input)return 0 engine=build_and_warmup(model,M,N,K)t0=time.time()count=3forward(engine,input_tensor,count)t1=time.time()flops=(count*FLOPS)/(t1-t0)tflops=flops/1000/1000/1000/1000print("MatMulResult,{},{},{},{},{:.3f}".format(M,N,K,FLOPS,tflops))return tflopsif __name__ == "__main__":evaluate(int(sys.argv[1]),int(sys.argv[2]),int(sys.argv[3]))
遗传算法实现
import os
import sys
import time
import numpy as np
import geatpy as ea
import subprocessdef eval(M,N,K):cmd="python eval.py {} {} {} > rlog.txt 2>&1".format(M,N,K)if os.path.exists("rlog.txt"):os.remove("rlog.txt")try:p=subprocess.Popen(cmd,shell=True)p.wait()with open("rlog.txt","r") as f:content = [s for s in f.readlines() if s.find("MatMulResult")>=0]if len(content)==1:ret=float(content[0].strip().split(",")[5])return retexcept:passprint("{}:{}:{} device error,reboot".format(M,N,K))p=subprocess.Popen("reboot device cmd",shell=True)p.wait()time.sleep(5)return 0class MatMulParamsProblem(ea.Problem):def __init__(self):name = 'MatMulParamsProblem' # 初始化name(函数名称,可以随意设置)M = 1 # 初始化M(目标维数)maxormins = [-1] # 初始化maxormins(目标最小最大化标记列表,1:最小化该目标;-1:最大化该目标)Dim = 3 # 初始化Dim(决策变量维数)varTypes = [1,1,1] # 初始化varTypes(决策变量的类型,元素为0表示对应的变量是连续的;1表示是离散的)self.Ms=range(16,10240,16)self.Ns=range(16,10240,16)self.Ks=range(16,10240,16)lb = [0,0,0] # 决策变量下界ub = [len(self.Ms),len(self.Ns),len(self.Ks)] # 决策变量上界lbin = [1,1,1] # 决策变量下边界(0表示不包含该变量的下边界,1表示包含)ubin = [0,0,0] # 决策变量上边界(0表示不包含该变量的上边界,1表示包含)# 调用父类构造方法完成实例化ea.Problem.__init__(self,name,M,maxormins,Dim,varTypes,lb,ub,lbin,ubin)def evalVars(self, Vars):output=[]for Var in Vars:VarInt = Var.astype(np.int32)M = self.Ms[VarInt[0]]N = self.Ns[VarInt[1]]K = self.Ks[VarInt[2]]tflops=eval(M,N,K)print("{:05d},{:05d},{:05d},{:.3f}".format(M,N,K,tflops))output.append(tflops)return np.array(output).reshape(-1,1)def calReferObjV(self):referenceObjV = np.array([[33]])return referenceObjVdef main():# 实例化问题对象problem = MatMulParamsProblem()# 构建算法algorithm = ea.soea_DE_rand_1_bin_templet(problem,ea.Population(Encoding='RI', NIND=50),MAXGEN=25, # 最大进化代数。logTras=1, # 表示每隔多少代记录一次日志信息,0表示不记录。trappedValue=1e-3, # 单目标优化陷入停滞的判断阈值。maxTrappedCount=10) # 进化停滞计数器最大上限值。algorithm.mutOper.F = 0.5 # 差分进化中的参数F。algorithm.recOper.XOVR = 0.2 # 差分进化中的参数Cr。# 求解res = ea.optimize(algorithm,verbose=True,drawing=3,outputMsg=True,drawLog=True,saveFlag=True,dirName="./GaQaunt")print(res)if __name__ == "__main__":main()