先说一下需求场景,导出接口不能直接查询数据源,是通过接口远程调用,要求支持动态表头,需要支持导出200w以上数据.设计如下:
1.由于数据远程调用,大量数据查询导出不符合实际情况,这里采取远程调用接口,将数据写入华为obs,使用追加写入的方式,可以分批查询写入,避免一次查询全部数据造成的内存溢出问题,而且可以边写边读,提高效率
2.设计一个进度表记录写入obs文件的进度,导出接口查询进度表,实时读取写入obs的数据
3.当obs写入完成修改写入状态,导出接口发现写入完成后进行最后一次数据读取,然后讲获取的数据导出到excel
try {
String objectKey = EXPORT_LOG_PATH + operationId + “.log”;
String fileName = getFileName(areaMark, didSegment);
servletResponse.setCharacterEncoding("UTF-8");
servletResponse.setHeader("content-Type", "application/vnd.ms-excel");
servletResponse.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(fileName + ".xlsx", "UTF-8"));
servletResponse.setHeader("Access-Control-Expose-Headers", "Content-Disposition");
// 动态表头字段
List<String> columns = getColumns(deviceModelName, deviceType, dType, channel, version, p2pVersion, shareAbility,storeAbility, msgAbility, operateAbility, aiAbility, pushAbility);
ExcelWriter build = EasyExcel.write(servletResponse.getOutputStream(), DeviceInfoObs.class).build();
Integer exportCount = 0;
// sheet页
Integer sheetNum = 1;
// 每页最大条数
Integer sheetSize = 1000000;
// 写入状态
String writeStatus = "running";
// 上次读取位置
Long lastPosition = 0L;
long timeBegin = System.currentTimeMillis();
while ("running".equals(writeStatus)) {// 查询设置时间间隔 1s 等待设备信息写入OBSThread.sleep(1000);if (System.currentTimeMillis() - timeBegin >= 3600000) {LOG.error("=======deviceExport===本次下载超时主动退出,operationId={}", operationId);break;}Map<String, Object> progress = getProgress(areaMark, operationId);// 写入状态if (progress != null && progress.get("writeStatus") != null) {writeStatus = progress.get("writeStatus").toString();}// 追加位置List<String> posList = new ArrayList<>();if (progress != null && progress.get("appendPos") != null) {String appendPos = progress.get("appendPos").toString();if (lastPosition != 0L) {appendPos = appendPos.substring(appendPos.indexOf(String.valueOf(lastPosition)));}posList = Arrays.asList(appendPos.split(","));}if (posList.size() > 1) {// 记录读取obs时最后一个poslastPosition = Long.parseLong(posList.get(posList.size() - 1));for (int i = 0; i < posList.size() - 1; i++) {List<DeviceInfoObs> deviceInfoObsList = getAppendLog(Long.parseLong(posList.get(i)),Long.parseLong(posList.get(i + 1)), objectKey);// 组装数据setExportData(deviceInfoObsList);exportCount += deviceInfoObsList.size();sheetNum = (exportCount + sheetSize - 1) / sheetSize;// 写入数据build.write(deviceInfoObsList,EasyExcel.writerSheet("sheet" + sheetNum).includeColumnFiledNames(columns).build());// 更新导出进度updateProgress(areaMark, operationId, exportCount);}}
}
build.finish();
} catch (Exception e) {
LOG.error(“deviceExport error:{}”, e.getMessage());
e.printStackTrace();
}
try {
// 查询所有分表
List allTable = deviceInfoDOMapper.getTableList();
tableList.addAll(allTable);
// 创建obs连接
ObsClient obsClient = ObsClientLoad.getInstance(accesskey, secretkey, endPoint);
// 首次写入标识
boolean fistFlag = true;
String objectKey = EXPORT_LOG_PATH + operationId + ".log";
// 存放设备数据的集合 批处理减少写obs的次数
List<DeviceInfoObs> partDeviceInfoList = new ArrayList<>();
// 查询分表数据
for (String table : tableList) {List<DeviceInfoObs> deviceInfos = deviceInfoDOMapper.getDeviceInfoList(table, deviceModelName, deviceType,dType, channel, version, p2pVersion, shareAbility, storeAbility, msgAbility, operateAbility,aiAbility, pushAbility, null);while (deviceInfos.size() > 0) {partDeviceInfoList.addAll(deviceInfos);Long id = deviceInfos.get(deviceInfos.size() - 1).getId();if (partDeviceInfoList.size() >= BATCH_WRITE_SIZE) {// 分表数据写入obsLong position = writDidToObs(partDeviceInfoList, obsClient, fistFlag, objectKey);// 同步更新写入进度fistFlag = updateWriteProgress(partDeviceInfoList, fistFlag, operationId, position);}// 继续查询deviceInfos = deviceInfoDOMapper.getDeviceInfoList(table, deviceModelName, deviceType, dType, channel,version, p2pVersion, shareAbility, storeAbility, msgAbility, operateAbility, aiAbility, pushAbility, id);}
}
// 处理数据不满足批量上传数量的情况
if (partDeviceInfoList.size() > 0) {// 分表数据写入obsLong position = writDidToObs(partDeviceInfoList, obsClient, fistFlag, objectKey);// 同步更新写入进度updateWriteProgress(partDeviceInfoList, fistFlag, operationId, position);
}
// 设置对象过期时间
setObsExpires(obsClient, objectKey, "1");
// 设置写入状态成功
DeviceInfoProgress progress = new DeviceInfoProgress();
progress.setOperateId(operationId);
progress.setWriteStatus("success");
deviceInfoProgressMapper.updateByPrimaryKeySelective(progress);
}catch (Exception e){
e.printStackTrace();
log.error(“deviceExport: " + e.getMessage());
DeviceInfoProgress progress = new DeviceInfoProgress();
progress.setOperateId(operationId);
progress.setWriteStatus(“failed”);
deviceInfoProgressMapper.updateByPrimaryKeySelective(progress);
}
String property = System.getProperty(“line.separator”);
Long position = 0L;
try {
// 首次写入
if (fistFlag) {
AppendObjectRequest objectRequest = new AppendObjectRequest();
objectRequest.setObjectKey(objectKey);
objectRequest.setPosition(0L);
objectRequest.setBucketName(bucketName);
// 写入文件内容
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
for (DeviceInfoObs info : list) {
byteArrayOutputStream.write(JSONObject.toJSONString(info).getBytes());
if (list.indexOf(info) < list.size() - 1) {
byteArrayOutputStream.write(”,“.getBytes());
byteArrayOutputStream.write(property.getBytes());
}
}
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
objectRequest.setInput(byteArrayInputStream);
// 追加记录
obsClient.appendObject(objectRequest);
// 返回写入位置
ObsObject object = obsClient.getObject(bucketName, objectKey);
position = object.getMetadata().getContentLength();
} else {
// 追加操作
ObsObject object = obsClient.getObject(bucketName, objectKey);
AppendObjectRequest objectRequest = new AppendObjectRequest();
objectRequest.setObjectKey(objectKey);
objectRequest.setPosition(object.getMetadata().getContentLength());
objectRequest.setBucketName(bucketName);
// 追加写入文件内容
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
for (DeviceInfoObs info : list) {
byteArrayOutputStream.write(”,".getBytes());
byteArrayOutputStream.write(property.getBytes());
byteArrayOutputStream.write(JSONObject.toJSONString(info).getBytes());
}
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
objectRequest.setInput(byteArrayInputStream);
// 追加记录
obsClient.appendObject(objectRequest);
// 返回写入位置
object = obsClient.getObject(bucketName, objectKey);
position = object.getMetadata().getContentLength();
}
} catch (Exception e) {
e.printStackTrace();
log.error(“writDidToObs error msg is {}”, e.getMessage());
}