直接给代码吧。
如果不需要svs格式,需要的是tif的话,可以直接用slideviewer或者caseviewer转成tif。
事先说明一下,这个代码是采用一次性读取整个WSI后,再转存为svs的方式。
所需要的内存需要满足一次性读取整个WSI的要求。
这个代码也可以用来tif转svs。
from openslide import OpenSlide
import pyvips
import numpy as np
from math import ceil
import openslide
import os
import tifffile
import cv2
from tqdm import tqdm
import time
import glob
import copyTILE_SIZE = 512gfi = lambda img,ind : copy.deepcopy(img[ind[0]:ind[1], ind[2]:ind[3]])def find_file(path,depth_down,depth_up=0,suffix='.xml'):ret = []for i in range(depth_up,depth_down):_path = os.path.join(path,'*/'*i+'*'+suffix)ret.extend(glob.glob(_path))ret.sort()return retdef up_to16_manifi(hw):return int(ceil(hw[0]/TILE_SIZE)*TILE_SIZE), int(ceil(hw[1]/TILE_SIZE)*TILE_SIZE)def gen_im(wsi, index):ind = 0while True:temp_img = gfi(wsi, index[ind])ind+=1yield temp_img
def get_name_from_path(file_path:str, ret_all:bool=False):dir, n = os.path.split(file_path)n, suffix = os.path.splitext(n)if ret_all:return dir, n, suffixreturn ndef gen_patches_index(ori_size, *, img_size=224, stride = 224,keep_last_size = False):"""这个函数用来按照输入的size和patch大小,生成每个patch所在原始的size上的位置keep_last_size:表示当size不能整除patch的size的时候,最后一个patch要不要保持输入的img_size返回:一个np数组,每个成员表示当前patch所在的x和y的起点和终点如:[[x_begin,x_end,y_begin,y_end],...]"""height, width = ori_size[:2]index = []if height<img_size or width<img_size: print("input size is ({} {}), small than img_size:{}".format(height, width, img_size))return indexfor h in range(0, height+1, stride):xe = h+img_sizeif h+img_size>height:xe = heighth = xe-img_size if keep_last_size else hfor w in range(0, width+1, stride):ye = w+img_sizeif w+img_size>width:ye = widthw = ye-img_size if keep_last_size else windex.append(np.array([h, xe, w, ye]))if ye==width:breakif xe==height:breakreturn indexdef just_ff(path:str,*,file=False,floder=True,create_floder=False, info=True):"""Check the input path status. Exist or not.Args:path (str): _description_file (bool, optional): _description_. Defaults to False.floder (bool, optional): _description_. Defaults to True.create_floder (bool, optional): _description_. Defaults to False.info (bool, optional): _description_. Defaults to True.Returns:_type_: _description_"""if file:return os.path.isfile(path)elif floder:if os.path.exists(path):return Trueelse:if create_floder:try:os.makedirs(path) if info:print(r"Path '{}' does not exists, but created !!".format(path))return Trueexcept ValueError:if info:print(r"Path '{}' does not exists, and the creation failed !!".format(path))passelse:if info:print(r"Path '{}' does not exists!!".format(path))return Falsedef just_dir_of_file(file_path:str, create_floder:bool=True):"""_summary_Check the dir of the input file. If donot exist, creat it!Args:file_path (_type_): _description_create_floder (bool, optional): _description_. Defaults to True.Returns:_type_: _description_"""_dir = os.path.split(file_path)[0]return just_ff(_dir, create_floder = create_floder)def split_path(root_path:str, input_path:str):path_split = os.sepwhile(root_path[-1]==path_split):root_path = root_path[0:len(root_path)-1]ret_path = input_path[len(root_path):len(input_path)]if len(ret_path) == 0:return ''while(ret_path[0]==path_split):ret_path = ret_path[1:len(ret_path)]return ret_pathdef gen_pyramid_tiff(in_file, out_file, select_level=0):'''select_level 决定了读取那一层。第0层是倍率最高的。'''svs_desc = 'Aperio Image Library Fake\nABC |AppMag = {mag}|Filename = {filename}|MPP = {mpp}'label_desc = 'Aperio Image Library Fake\nlabel {W}x{H}'macro_desc = 'Aperio Image Library Fake\nmacro {W}x{H}'# 指定mpp值odata = openslide.open_slide(in_file)# 获取当前图像的MPP# 如果不是这个字段,可以找一下是哪个mpp = float(odata.properties['mirax.LAYER_0_LEVEL_0_SECTION.MICROMETER_PER_PIXEL_X'])#0.5# 指定缩放倍率mag = 40#mpp 在0.25左右的是40X,0.5左右的是20Xif mpp<=0.3:mag = 20mpp = mpp*2# 换算mpp值到分辨率resolution = [10000 / mpp, 10000 / mpp, 'CENTIMETER']# 指定图像名字if odata.properties.get('aperio.Filename') is not None:filename = odata.properties['aperio.Filename']else:filename = get_name_from_path(in_file)print(f"loading '{in_file}'")start = time.time()# image_py = openslide.open_slide(wsi_path)# pyvip 比openslide更快,但如果没有,用openslide也是可以的image_py = pyvips.Image.openslideload(in_file, level = select_level)image = np.array(image_py)[...,0:3]print(f"finish loading '{in_file}'. costing time:{time.time()-start}")# image = np.array(image_py['20X'])# 缩略图thumbnail_im = np.zeros([762, 762, 3], dtype=np.uint8)thumbnail_im = cv2.putText(thumbnail_im, 'thumbnail', (thumbnail_im.shape[1]//4, thumbnail_im.shape[0]//2), cv2.FONT_HERSHEY_PLAIN, 6, color=(255, 0, 0), thickness=3)# 标签图label_im = np.zeros([762, 762, 3], dtype=np.uint8)label_im = cv2.putText(label_im, 'label', (label_im.shape[1]//4, label_im.shape[0]//2), cv2.FONT_HERSHEY_PLAIN, 6, color=(0, 255, 0), thickness=3)# 宏观图macro_im = np.zeros([762, 762, 3], dtype=np.uint8)macro_im = cv2.putText(macro_im, 'macro', (macro_im.shape[1]//4, macro_im.shape[0]//2), cv2.FONT_HERSHEY_PLAIN, 6, color=(0, 0, 255), thickness=3)# tile 大小tile_hw = np.int64([TILE_SIZE, TILE_SIZE])width, height = image.shape[0:2]# 要需要的金字塔分辨率multi_hw = np.int64([(width, height), (width//2, height//2), (width//4, height//4), (width//8, height//8),(width//16, height//16),(width//32, height//32),(width//64, height//64)])# 尝试写入 svs 格式with tifffile.TiffWriter(out_file, bigtiff=True) as tif:thw = tile_hw.tolist()# outcolorspace 要保持为默认的 YCbCr,不能使用rgb,否则颜色会异常# 95 是默认JPEG质量,值域是 0-100,值越大越接近无损compression = ['JPEG', 95, dict(outcolorspace='YCbCr')]# compression = 'JPEG'kwargs = dict(subifds=0, photometric='rgb', planarconfig='CONTIG', compression=compression, dtype=np.uint8, metadata=None)for i, hw in enumerate(multi_hw):hw = up_to16_manifi(hw)temp_wsi = cv2.resize(image, (hw[1], hw[0]))new_x, new_y = up_to16_manifi(hw)new_wsi = np.ones((new_x, new_y, 3),dtype=np.uint8)*255new_wsi[0:hw[0], 0:hw[1],:] = temp_wsi[...,0:3]index = gen_patches_index((new_x, new_y),img_size=TILE_SIZE,stride=TILE_SIZE)gen = gen_im(new_wsi, index)if i == 0:desc = svs_desc.format(mag=mag, filename=filename, mpp=mpp)# tif.write(data=gen, resolution=resolution, description=desc, **kwargs)tif.write(data=gen, shape=(*hw, 3), tile=thw[::-1], resolution=resolution, description=desc, **kwargs)_hw = up_to16_manifi(multi_hw[-2])thumbnail_im = cv2.resize(image, (_hw[1], _hw[0]))[...,0:3]tif.write(data=thumbnail_im, description='', **kwargs)else:tif.write(data=gen, shape=(*hw, 3), tile=thw[::-1], resolution=resolution, description='', **kwargs)_hw = up_to16_manifi(multi_hw[-2])macro_im = cv2.resize(image, (_hw[1], _hw[0]))[...,0:3]tif.write(data=macro_im, subfiletype=9, description=macro_desc.format(W=macro_im.shape[1], H=macro_im.shape[0]), **kwargs)# mrxs所在的目录
DATA_DIR = ''
#保存目录
SAVE_DIR = ''wsi_list = find_file(DATA_DIR, 1, suffix='.mrxs')for w_name in tqdm(wsi_list):t1 = time.perf_counter()patient_name = w_name.split(os.path.sep)[-2]wsi_name = get_name_from_path(w_name)diff_path = split_path(DATA_DIR, get_name_from_path(w_name, ret_all=True)[0])save_path = os.path.join(SAVE_DIR, diff_path, f'{wsi_name}.svs')# 如果svs文件已经存在,则跳过if just_ff(save_path,file=True):continuejust_dir_of_file(save_path)gen_pyramid_tiff(w_name, save_path)print(f'{wsi_name}:',time.perf_counter() - t1)