<dependency><groupId>com.alipay.rdf.file</groupId><artifactId>rdf-file-core</artifactId><version>2.2.10</version>
</dependency>
一:读
1.1 原始读
原始文件读取就是不需要根据协议布局模板和数据定义模板来把数据转换成对象。
1.readRow(Class<?> requiredType)
如果文件是分隔符分割的支持读取, requiredType只能是String[].class。
2.支持readLine()。
3.不支持 readHead, readTail,getSummary()方法。
4.用户完全可用jdk的Reader工具,组件实现完全是为了接口统一。
String path = new ClassPathResource("data1.txt").getFile().getPath();
FileConfig fileConfig = new FileConfig(path, "templates/template1.json", new StorageConfig("nas"));
fileConfig.setType(FileCoreToolContants.RAW_READER);FileReader fileReader = FileFactory.createReader(fileConfig);
String[] row = null;
while (null != (row = fileReader.readRow(String[].class))) {System.out.println(row);
}
fileReader.close();fileReader = FileFactory.createReader(fileConfig);
String line = null;
while (null != (line = fileReader.readLine())) {System.out.println(line);
}
fileReader.close();
1.2 简单读取
String path = new ClassPathResource("data1.txt").getFile().getPath();
FileConfig config = new FileConfig(path, "templates/template1.json", new StorageConfig("nas"));
FileReader fileReader = FileFactory.createReader(config);try {Map<String, Object> head = fileReader.readHead(HashMap.class);System.out.println(head);Map<String, Object> row = null;while (null != (row = fileReader.readRow(HashMap.class))) {System.out.println(row);}Map<String, Object> tail = fileReader.readTail(HashMap.class);System.out.println(tail);
} finally {fileReader.close();
}
1.3 汇总读
String path = new ClassPathResource("data2.txt").getFile().getPath();
FileConfig config = new FileConfig(path, "templates/template2.json", new StorageConfig("nas"));
config.setSummaryEnable(true);
FileReader fileReader = FileFactory.createReader(config);try {Map<String, Object> head = fileReader.readHead(HashMap.class);BigDecimal totalAmount = (BigDecimal)head.get("totalAmount");Map<String, Object> tail = fileReader.readTail(HashMap.class);BigDecimal tailAmount = (BigDecimal)tail.get("tailAmount");Map<String, Object> row = null;while (null != (row = fileReader.readRow(HashMap.class))) {System.out.println(row);}Summary summary = fileReader.getSummary();for (SummaryPair pair : summary.getHeadSummaryPairs()) {BigDecimal summaryValue = (BigDecimal) pair.getSummaryValue();BigDecimal headValue = (BigDecimal) pair.getHeadValue();pair.isSummaryEquals();}for (SummaryPair pair : summary.getTailSummaryPairs()) {BigDecimal summaryValue = (BigDecimal) pair.getSummaryValue(); //数据字段汇总后的值BigDecimal tailValue = (BigDecimal) pair.getTailValue(); //文件尾中的汇总值pair.isSummaryEquals(); // 汇总的值是否一致}
} finally {fileReader.close();
}
1.4 排序
组件可以根据文件一个或者几个字段来对文件记录进行排序。
排序后可以是有序的分片文件,也可以合成一个完整的有序文件。
1.4.1 单文件排序
{"head":["totalCount|总笔数|Required|BigDecimal","totalAmount|总金额|BigDecimal|Required"],"body":["seq|流水号","instSeq|基金公司订单号|Required","gmtApply|订单申请时间|Date:yyyy-MM-dd HH:mm:ss","date|普通日期|Date:yyyyMMdd","dateTime|普通日期时间|Date:yyyyMMdd HH:mm:ss","applyNumber|普通数字|BigDecimal","amount|金额|BigDecimal","age|年龄|Integer","longN|长整型|Long","bol|布尔值|Boolean","memo|备注"],"protocol":"DE","lineBreak":"\r\n","summaryColumnPairs":["totalAmount|amount"]
}
总笔数:100|总金额:300.03
流水号|基金公司订单号|订单申请时间|普通日期|普通日期时间|普通数字|金额|年龄|长整型|布尔值|备注
seq_17|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|8|12345|true|备注1de2
seq_23|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注2de2
seq_12|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|6|12345|true|备注3de2
seq_80|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|30|12345|true|备注4de2
seq_77|inst_seq_77|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|26|12345|true|备注5de2
seq_56|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|38|12345|true|备注6de2
seq_55|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|35|12345|true|备注7de2
seq_33|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|20|12345|true|备注8de2
seq_7|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|18|12345|true|备注9de2
String path = new ClassPathResource("data4.txt").getFile().getPath();
FileConfig fileConfig = new FileConfig(path, "templates/template4.json", new StorageConfig("nas"));
String sortTempPath = "/Users/mengday/Temp/springboot-rdffile/";
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 2, TimeUnit.MINUTES, new LinkedBlockingQueue());
SortConfig sortConfig = new SortConfig(sortTempPath, SortConfig.SortTypeEnum.ASC, executor, SortConfig.ResultFileTypeEnum.FULL_FILE_PATH);
sortConfig.setResultFileName("sort_result");
sortConfig.setSliceSize(1024);
sortConfig.setSortIndexes(new int[]{7});FileSorter fileSorter = FileFactory.createSorter(fileConfig);
SortResult sortResult = fileSorter.sort(sortConfig);
System.out.println(sortResult);
1.4.2 多文件排序
FileConfig fileConfig = new FileConfig("templates/template4.json", new StorageConfig("nas"));
// 多文件排序
fileConfig.setType(FileCoreToolContants.PROTOCOL_MULTI_FILE_SORTER);String[] sourceFilePaths = new String[2];
sourceFilePaths[0] = new ClassPathResource("data4.txt").getFile().getPath();
sourceFilePaths[1] = new ClassPathResource("data5.txt").getFile().getPath();String sortTempPath = "/Users/mengday/Temp/springboot-rdffile/";
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.MINUTES, new LinkedBlockingQueue());
SortConfig sortConfig = new SortConfig(sortTempPath, SortConfig.SortTypeEnum.ASC, executor, SortConfig.ResultFileTypeEnum.FULL_FILE_PATH);
sortConfig.setSourceFilePaths(sourceFilePaths);
sortConfig.setResultFileName("test_sort");
sortConfig.setSliceSize(1024);
sortConfig.setSortIndexes(new int[]{0, 1});FileSorter fileSorter = FileFactory.createSorter(fileConfig);
SortResult sortResult = fileSorter.sort(sortConfig);
System.out.println(sortResult);
1.4.3 读文件之单文件有序读
先排序,再读。
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.MINUTES, new LinkedBlockingQueue());
String sortTempPath = "/Users/mengday/Temp/springboot-rdffile/";
String path = new ClassPathResource("data4.txt").getFile().getPath();
FileConfig fileConfig = new FileConfig(path, "templates/template4.json", new StorageConfig("nas"));
fileConfig.setType(FileCoreToolContants.PROTOCOL_MULTI_FILE_SORTER);ProtocolFilesSortedReader reader = (ProtocolFilesSortedReader)FileFactory.createReader(fileConfig);
FileSorter fileSorter = (FileSorter) reader;
SortConfig sortConfig = new SortConfig(sortTempPath, SortConfig.SortTypeEnum.ASC, executor, SortConfig.ResultFileTypeEnum.SLICE_FILE_PATH);
sortConfig.setSliceSize(256);
sortConfig.setSortIndexes(new int[]{0, 1});
fileSorter.sort(sortConfig);HashMap<String, Object> head = reader.readHead(HashMap.class);
System.out.println(head);
Map<String, Object> row = null;
int i = 0;
while (null != (row = reader.readRow(HashMap.class))) {System.out.println(row.get("seq"));
}
1.4.4 多文件有序读
FileConfig fileConfig = new FileConfig("templates/template5.json", new StorageConfig("nas"));fileConfig.setType(FileCoreToolContants.PROTOCOL_MULTI_FILE_SORTER);
ProtocolFilesSortedReader reader = (ProtocolFilesSortedReader) FileFactory.createReader(fileConfig);FileSorter fileSorter = (FileSorter) reader;ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.MINUTES, new LinkedBlockingQueue());String sortTempPath = new ClassPathResource("temp").getFile().getPath();String[] sourceFilePath = {new ClassPathResource("data51.txt").getFile().getPath(),new ClassPathResource("data52.txt").getFile().getPath(),new ClassPathResource("data53.txt").getFile().getPath()};SortConfig sortConfig = new SortConfig(sortTempPath, SortConfig.SortTypeEnum.ASC, executor, SortConfig.ResultFileTypeEnum.SLICE_FILE_PATH);
sortConfig.setResultFileName("testSort");
sortConfig.setSliceSize(1024);
sortConfig.setSortIndexes(new int[]{0, 1});
sortConfig.setSourceFilePaths(sourceFilePath);
fileSorter.sort(sortConfig);HashMap<String, Object> head = reader.readHead(HashMap.class);
System.out.println(head);System.out.println("--------------------------------------");
HashMap<String, Object> tail = reader.readTail(HashMap.class);
System.out.println(tail);Map<String, Object> row = null;
int i = 0;
while (null != (row = reader.readRow(HashMap.class))) {System.out.println(row.get("seq"));
}
1.5 分片读取
分片的文件名一般都是有规律的,比如放在一个文件夹中,文件的名字有相同的前缀,后缀可以使用累加编号(例如从0或者从1开始)。例如 test-0.txt
,test-1.txt
, test-2.txt
这样在合并文件时只要知道分片总数量就能拿到所有文件的路径了。
1.5.1 文件结构分割
String path = new ClassPathResource("data3.txt").getFile().getPath();
FileConfig config = new FileConfig(path, "templates/template3.json", new StorageConfig("nas"));// 创建分解分割器
FileSplitter splitter = FileFactory.createSplitter(config.getStorageConfig());
// 获取头分片
FileSlice headSlice = splitter.getHeadSlice(config);// 读取头分片
FileConfig headConfig = config.clone();
headConfig.setPartial(headSlice.getStart(), headSlice.getLength(), headConfig.getFileDataType());
FileReader headReader = FileFactory.createReader(headConfig);
try {Map<String, Object> head = headReader.readHead(HashMap.class);System.out.println(head);
} finally {headReader.close();
}FileSlice bodySlice = splitter.getBodySlice(config);
FileConfig bodyConfig = config.clone();
bodyConfig.setPartial(bodySlice.getStart(), bodySlice.getLength(), bodySlice.getFileDataType());
FileReader bodyReader = FileFactory.createReader(bodyConfig);
try {Map<String, Object> row = null;while (null != (row = bodyReader.readRow(HashMap.class))) {System.out.println(Thread.currentThread().getName() + ":" + row);}
} finally {bodyReader.close();
}// 获取tail分片
FileSlice tailSlice = splitter.getTailSlice(config);
// 读取tail分片
FileConfig tailConfig = config.clone();
tailConfig.setPartial(tailSlice.getStart(), tailSlice.getLength(), tailSlice.getFileDataType());
FileReader tailReader = FileFactory.createReader(tailConfig);
try {Map<String, Object> tail = tailReader.readTail(HashMap.class);System.out.println(tail);
} finally {tailReader.close();
}
1.5.2 body按大小分片
String path = new ClassPathResource("data3.txt").getFile().getPath();
FileConfig config = new FileConfig(path, "templates/template3.json", new StorageConfig("nas"));
final FileSplitter splitter = FileFactory.createSplitter(config.getStorageConfig());
// body 按大小分片
final List<FileSlice> bodySlices = splitter.getBodySlices(config, 256);// 分片读取数据
for (FileSlice slice : bodySlices) {final FileConfig sliceConfig = config.clone();sliceConfig.setPartial(slice.getStart(), slice.getLength(), slice.getFileDataType());final FileReader reader = FileFactory.createReader(sliceConfig);try {Map<String, Object> row = null;while (null != (row = reader.readRow(HashMap.class))) {System.out.println(row);}} finally {reader.close();}
}
1.5.3 先分片再根据分片读
String filePath = new ClassPathResource("data2.txt").getFile().getPath();
FileConfig fileConfig = new FileConfig(filePath, "templates/template2.json", new StorageConfig("nas"));// 对body分片
FileSplitter fileSplitter = FileFactory.createSplitter(fileConfig.getStorageConfig());
List<FileSlice> bodySlices = fileSplitter.getBodySlices(fileConfig, 256);
System.out.println(bodySlices);FileConfig bodyConfig = new FileConfig(filePath, "templates/template2.json", new StorageConfig("nas"));
for (FileSlice bodySlice : bodySlices) {bodyConfig.setPartial(bodySlice.getStart(), bodySlice.getEnd() - bodySlice.getStart(), FileDataTypeEnum.BODY);FileReader sliceBodyReader = FileFactory.createReader(bodyConfig);try {Map<String, Object> row = null;while (null != (row = sliceBodyReader.readRow(HashMap.class))) {System.out.println(row);}} finally {sliceBodyReader.close();}
}// 获取head
FileSlice headSlice = fileSplitter.getHeadSlice(fileConfig);
FileConfig headConfig = fileConfig.clone();
headConfig.setPartial(headSlice.getStart(), headSlice.getLength(), headConfig.getFileDataType());
FileReader headReader = FileFactory.createReader(headConfig);
HashMap<String, Object> headMap = headReader.readHead(HashMap.class);
System.out.println(headMap);
headReader.close();// 获取tail
FileSlice tailSlice = fileSplitter.getTailSlice(fileConfig);
FileConfig tailFileConfig = fileConfig.clone();
tailFileConfig.setPartial(tailSlice.getStart(), tailSlice.getLength(), tailSlice.getFileDataType());
FileReader tailReader = FileFactory.createReader(tailFileConfig);
HashMap<String, Object> tailMap = tailReader.readTail(HashMap.class);
tailReader.close();
System.out.println(tailMap);
二:写
2.1 正常写
协议布局模板
使用内置的布局文件: rdf-file-core-2.2.10.jar!/META-INF/rdf-file/protocol/fund.xml
数据定义模板
{"head":["identity|信息标识|[8,0]|default:OFDCFDAT","version|协议版本号|[4,0]|default:20","msgCreator|信息创建人|[9,0]|default:H0","msgRecipient|信息接收人|[9,0]","sendDate|传送发生日期|[8,0]|Date:yyyyMMdd","summaryTableNo|汇总表号|[3,0]","fileTypeCode|文件类型代码 |[2,0]","sender|发送人|[8,0]|default:H0","recipient|接收人|[8,0]"],"body":["TransactionCfmDate|对帐日期|[8,0]|Date:yyyyMMdd","FundCode|基金代码|[8,0]","AvailableVol|基金可用份数|Integer|[6,2]"],"tail":["fileEnd|数据文件尾部字符|default:OFDCFEND|[8,0]"],"protocol":"FUND"
}
示例程序
String filePath = new File("/Users/mengday/Temp", "demofund.txt").getAbsolutePath();
FileConfig fileConfig = new FileConfig(filePath, "templates/demofund.json", new StorageConfig("nas"));
FileWriter fileWriter = FileFactory.createWriter(fileConfig);
try {//构建文件头Map<String, Object> head = new HashMap<>();head.put("msgRecipient", "xxx");head.put("sendDate", "20231122");head.put("summaryTableNo", "aa");head.put("fileTypeCode", "bb");head.put("recipient", "ll");head.put("totalCount", 1);fileWriter.writeHead(head);// 文件数据内容Map<String, Object> row = new HashMap<>();row.put("TransactionCfmDate", "20231122");row.put("FundCode", "中国1");row.put("AvailableVol", 42.11);fileWriter.writeRow(row);// 文件尾,没有数据,是取了数据定义模板中默认值fileWriter.writeTail(new HashMap<String, Object>());
} catch (Exception e) {e.printStackTrace();
} finally {fileWriter.close();
}
生成文件
OFDCFDAT
20
H0
xxx
20231122
aa
bb
H0
ll
003
TransactionCfmDate
FundCode
AvailableVol
00000001
20231122中国1 004211
OFDCFEND
2.2 汇总写
协议布局模板
rdf-file-core-2.2.10.jar!/META-INF/rdf-file/protocol/sp.xml
数据定义模板
tail:定义字段,summaryColumnPairs:定义对什么字段进行汇总,汇总的值赋给tail的什么字段上。"totalAmount|amount"表示对totalAmount=sum(amount),totalCount是系统预定义好的总条数。
{"head": ["fileStart|数据文件头部字符|default:汇总文件测试"],"body": ["seq|流水号","instSeq|基金公司订单号|Required","gmtApply|订单申请时间|Date:yyyy-MM-dd HH:mm:ss","date|普通日期|Date:yyyyMMdd","dateTime|普通日期时间|Date:yyyyMMdd HH:mm:ss","applyNumber|普通数字|Long","amount|金额|BigDecimal","age|年龄|Integer","longN|长整型|Long","bol|布尔值|Boolean","memo|备注"],"tail": ["totalCount|总笔数|Required|Integer","totalAmount|总金额|BigDecimal|Required"],"protocol": "SP","lineBreak": "\r\n","summaryColumnPairs": ["totalAmount|amount"]
}
示例程序
String filePath = new File("/Users/mengday/Temp", "demosp.txt").getAbsolutePath();
FileConfig fileConfig = new FileConfig(filePath, "templates/demosp.json", new StorageConfig("nas"));
// 开启汇总写
fileConfig.setSummaryEnable(true);
FileWriter fileWriter = FileFactory.createWriter(fileConfig);
try {Date testDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2017-01-03 12:22:33");// 头使用数据定义模板的常量Map<String, Object> head = new HashMap<String, Object>();fileWriter.writeHead(head);// 写入两条数据Map<String, Object> body = new HashMap<String, Object>();body.put("seq", "seq12345");body.put("instSeq", "303");body.put("gmtApply", testDate);body.put("date", testDate);body.put("dateTime", testDate);body.put("applyNumber", 12);body.put("amount", new BigDecimal("1.22"));body.put("age", new Integer(33));body.put("longN", new Long(33));body.put("bol", true);body.put("memo", "memo1");fileWriter.writeRow(body);testDate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-03 12:22:33");body.put("seq", "seq234567");body.put("instSeq", "505");body.put("gmtApply", testDate);body.put("date", testDate);body.put("dateTime", testDate);body.put("applyNumber", 12);body.put("amount", new BigDecimal("1.09"));body.put("age", 66);body.put("longN", 125);body.put("bol", false);body.put("memo", "memo2");fileWriter.writeRow(body);// 根据汇总信息写入尾部fileWriter.writeTail(fileWriter.getSummary().summaryTailToMap());
} catch (Exception e) {e.printStackTrace();
} finally {fileWriter.close();
}
生成文件
汇总文件测试
seq12345|303|2017-01-03 12:22:33|20170103|20170103 12:22:33|12|1.22|33|33|true|memo1
seq234567|505|2016-02-03 12:22:33|20160203|20160203 12:22:33|12|1.09|66|125|false|memo2
2|2.31
三:合并写
文件合并使用场景,一般在分布式环境下导出文件,如分库分表下每个表导出一个分片文件,最后合成一个完整文件。
文件合并支持: 文件头, 文件body,文件尾,完整文件, 不同存储文件的合并。
{"head": ["totalCount|总笔数|Required|Integer","totalAmount|总金额|BigDecimal|Required"],"body": ["seq|流水号","instSeq|基金公司订单号|Required","gmtApply|订单申请时间|Date:yyyy-MM-dd HH:mm:ss","date|普通日期|Date:yyyyMMdd","dateTime|普通日期时间|Date:yyyyMMdd HH:mm:ss","applyNumber|普通数字|BigDecimal","amount|金额|BigDecimal","age|年龄|Integer","longN|长整型|Long","bol|布尔值|Boolean","memo|备注"],"tail": ["fileEnd|数据文件尾部字符","date|普通日期|Date:yyyyMMdd","amount|总金额|BigDecimal"],"protocol": "DE","summaryColumnPairs": ["totalAmount|amount","amount|amount"]
}
文件内容
de_all1.txt
总笔数:100|总金额:300.03
流水号|基金公司订单号|订单申请时间|普通日期|普通日期时间|普通数字|金额|年龄|长整型|布尔值|备注
seq_0|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注1
seq_1|inst_seq_1|2013-11-10 15:56:12|20131110|20131113 12:33:34|23.34|11.88|33|56789|false|
OFDCFEND|20131109|100
de_all2.txt
总笔数:100|总金额:300.11
流水号|基金公司订单号|订单申请时间|普通日期|普通日期时间|普通数字|金额|年龄|长整型|布尔值|备注
seq_2|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注1
seq_3|inst_seq_1|2013-11-10 15:56:12|20131110|20131113 12:33:34|23.34|11.88|33|56789|false|
OFDCFEND|20131109|12
de_all3.txt
总笔数:100|总金额:300.12
流水号|基金公司订单号|订单申请时间|普通日期|普通日期时间|普通数字|金额|年龄|长整型|布尔值|备注
seq_10|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注1
seq_11|inst_seq_1|2013-11-10 15:56:12|20131110|20131113 12:33:34|23.34|11.88|33|56789|false|
OFDCFEND|20131109|211
示例程序
String path = new ClassPathResource("data").getFile().getPath();
String filePath = new File(path, "de_all_merge.txt").getAbsolutePath();
FileConfig config = new FileConfig(filePath, "templates/template8.json", new StorageConfig("nas"));List<String> existFilePaths = Arrays.asList(new ClassPathResource("data/de_all1.txt").getFile().getPath(),new ClassPathResource("data/de_all2.txt").getFile().getPath(),new ClassPathResource("data/de_all3.txt").getFile().getPath()
);
MergerConfig mergerConfig = new MergerConfig();
mergerConfig.setExistFilePaths(existFilePaths);FileMerger fileMerger = FileFactory.createMerger(config);
fileMerger.merge(mergerConfig);
生成文件
- 总笔试累加
- 总金额累加
- 数据体合并
- 汇总累加
总笔数:300|总金额:900.26
流水号|基金公司订单号|订单申请时间|普通日期|普通日期时间|普通数字|金额|年龄|长整型|布尔值|备注
seq_0|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注1
seq_1|inst_seq_1|2013-11-10 15:56:12|20131110|20131113 12:33:34|23.34|11.88|33|56789|false|
seq_2|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注1
seq_3|inst_seq_1|2013-11-10 15:56:12|20131110|20131113 12:33:34|23.34|11.88|33|56789|false|
seq_10|inst_seq_0|2013-11-09 12:34:56|20131109|20131112 12:23:34|23.33|10.22|22|12345|true|备注1
seq_11|inst_seq_1|2013-11-10 15:56:12|20131110|20131113 12:33:34|23.34|11.88|33|56789|false|
OFDCFEND|20131109|323