1. 拆分比较细的版本
import h5py
import numpy as np
import scipy.io as io
import os
class ReadData():def __init__(self,path): self.path=pathself.data=self.read_raw_data()self.X=self.data['X']self.Y=self.data['Y']self.Z=self.data['Z']self.mods=['OOK', '4ASK', '8ASK', 'BPSK', 'QPSK', '8PSK', '16PSK', '32PSK','16APSK', '32APSK','64APSK', '128APSK', '16QAM', '32QAM', '64QAM','128QAM', '256QAM', 'AM-SSB-WC', 'AM-SSB-SC', 'AM-DSB-WC', 'AM-DSB-SC', 'FM', 'GMSK', 'OQPSK'] self.snrs = list(range(-20, 31, 2)) self.len_mods=len(self.mods) self.len_snrs=len(self.snrs) self.sample_num=4096 def read_raw_data(self):data={}with h5py.File(self.path, 'r') as h5file: data['X'] = h5file['X'][:] data['Y'] = h5file['Y'][:] data['Z'] = h5file['Z'][:] print('数据维度 X:',data['X'].shape) print('标签维度one-hot形式 Y:',data['Y'].shape) print('信噪比 Z:',data['Z'].shape) return datadef split_snr_mod_data(self):snr_mod_folder='data/snr_mod_data' if not os.path.exists(snr_mod_folder):os.makedirs(snr_mod_folder)n = 0for i in range(0, self.len_mods, 1):for j in range(0, self.len_snrs, 1):IQ = self.X[n * self.sample_num:(n + 1) * self.sample_num, :, :]path = 'data/snr_mod_data/'+ self.mods[i] + "_SNR=" + str(self.snrs[j]) + "dB.mat"io.savemat(path, {'IQ_data': IQ})n = n + 1print('END')def split_snr_data(self):snr_folder='data/snr_data' if not os.path.exists(snr_folder):os.makedirs(snr_folder)samples_per_mod_snr = self.sample_numtotal_mods = len(self.mods) total_snrs = len(self.snrs) for snr_index, snr in enumerate(self.snrs):combined_IQ = []for mod_index in range(total_mods):start_index = mod_index * total_snrs * samples_per_mod_snr + snr_index * samples_per_mod_snrend_index = start_index + samples_per_mod_snrIQ = self.X[start_index:end_index, :, :]combined_IQ.append(IQ)combined_IQ = np.concatenate(combined_IQ, axis=0)path = f'data/snr_data/SNR={snr}dB.mat'io.savemat(path, {'IQ_data': combined_IQ})print('END') def split_mod_data(self):snr_folder='data/mod_data' if not os.path.exists(snr_folder):os.makedirs(snr_folder)samples_per_mod_snr = self.sample_numtotal_snrs = self.len_snrs for mod_index, mod in enumerate(self.mods):combined_IQ = []for snr_index in range(total_snrs):start_index = mod_index * total_snrs * samples_per_mod_snr + snr_index * samples_per_mod_snrend_index = start_index + samples_per_mod_snrIQ = self.X[start_index:end_index, :, :]combined_IQ.append(IQ)combined_IQ = np.concatenate(combined_IQ, axis=0)path = f'data/mod_data/{mod}_all_SNRs.mat'io.savemat(path, {'IQ_data': combined_IQ})print('END')
def split_snr_mod_choice_data(data_folder,choice_mods,choice_snrs):save_folder='data/snr_mod_choice_data' if not os.path.exists(save_folder):os.makedirs(save_folder)name_list=[]for mod in choice_mods:for snr in choice_snrs:file_name=mod+'_SNR='+str(snr)+'dB.mat'name_list.append(file_name)print(name_list)data_list=[]for root, dirs, files in os.walk(data_folder): for file in files: if file in name_list:data=io.loadmat(os.path.join(root, file))['IQ_data']print(data.shape) data_list.append(data)stacked_array = np.concatenate(data_list, axis=0) print(stacked_array.shape) str_mod='-'.join(choice_mods) choice_snrs_str=[str(snr) for snr in choice_snrs]str_snr='-'.join(choice_snrs_str)name=str_mod+'+'+str_snrprint(name)path = f'data/snr_mod_choice_data/{name}.mat'io.savemat(path, {'IQ_data': stacked_array})if __name__ == '__main__':choice_mods=['OOK', '4ASK', '8ASK']choice_snrs=[0,2,4]data_folder='data/snr_mod_data' split_snr_mod_choice_data(data_folder,choice_mods,choice_snrs)
2. 最终留下的万能版本
import h5py
import numpy as np
import scipy.io as io
import os
class ReadData():def __init__(self,path): self.path=pathself.data=self.read_raw_data()self.X=self.data['X']self.Y=self.data['Y']self.Z=self.data['Z']self.mods=['OOK', '4ASK', '8ASK', 'BPSK', 'QPSK', '8PSK', '16PSK', '32PSK','16APSK', '32APSK','64APSK', '128APSK', '16QAM', '32QAM', '64QAM','128QAM', '256QAM', 'AM-SSB-WC', 'AM-SSB-SC', 'AM-DSB-WC', 'AM-DSB-SC', 'FM', 'GMSK', 'OQPSK'] self.snrs = list(range(-20, 31, 2)) self.len_mods=len(self.mods) self.len_snrs=len(self.snrs) self.sample_num=4096 self.snr_mod_folder='data/split' def read_raw_data(self):data={}with h5py.File(self.path, 'r') as h5file: data['X'] = h5file['X'][:] data['Y'] = h5file['Y'][:] data['Z'] = h5file['Z'][:] print('数据维度 X:',data['X'].shape) print('标签维度one-hot形式 Y:',data['Y'].shape) print('信噪比 Z:',data['Z'].shape) return datadef split_data(self):if not os.path.exists(self.snr_mod_folder):os.makedirs(self.snr_mod_folder)n = 0for i in range(0, self.len_mods, 1):for j in range(0, self.len_snrs, 1):IQ = self.X[n * self.sample_num:(n + 1) * self.sample_num, :, :]path = self.snr_mod_folder+ '/'+self.mods[i] + "_SNR=" + str(self.snrs[j]) + "dB.mat"io.savemat(path, {'IQ_data': IQ})n = n + 1print('END')
def choice_data(data_folder,choice_mods,choice_snrs):save_folder='data/choice' if not os.path.exists(save_folder):os.makedirs(save_folder)name_list=[]for mod in choice_mods:for snr in choice_snrs:file_name=mod+'_SNR='+str(snr)+'dB.mat'name_list.append(file_name)print(name_list)data_list=[]for root, dirs, files in os.walk(data_folder): for file in files: if file in name_list:data=io.loadmat(os.path.join(root, file))['IQ_data']print(data.shape) data_list.append(data)stacked_array = np.concatenate(data_list, axis=0) print(stacked_array.shape) str_mod='-'.join(choice_mods) choice_snrs_str=[str(snr) for snr in choice_snrs]str_snr='-'.join(choice_snrs_str)name=str_mod+'+'+str_snrprint(name)path = save_folder+f'/{name}.mat'io.savemat(path, {'IQ_data': stacked_array})if __name__ == '__main__':choice_mods=['OOK', '4ASK', '8ASK']choice_snrs=list(range(0, 31, 2)) data_folder='data/split' choice_data(data_folder,choice_mods,choice_snrs)