stream并行流快(文件11g)
try (Stream<String> lines = Files.lines(filePath)) {lines.parallel().forEach(str -> operatePartData(str, allDataList));
} catch (IOException e) {throw new RuntimeException(e);
}
线程池慢(文件11g)
lines.skip(1).limit(37230)
limit() 范围是在skip的偏移量上增加行数范围
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 3, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));for (int i = 1; i <= THREAD_TIME; i++) {int finalI = i;int startIndex = (i - 1) * avgLines + 1;int endIndex = i * avgLines;poolExecutor.execute(() -> {List<String[]> partData = new ArrayList<>();try (Stream<String> lines = Files.lines(filePath)) {if (1 == finalI) {lines.skip(1).limit(endIndex).forEach(str -> {operatePartData(str, partData);});System.out.printf("------------------ %s 起始位置:%s - %s, 数据量:%s%n", finalI, 1, endIndex, partData.size());} else if (THREAD_TIME == finalI) {lines.skip(startIndex).limit(totalLines.get() - startIndex).forEach(str -> {operatePartData(str, partData);});System.out.printf("------------------ %s 起始位置:%s - %s, 数据量:%s%n", finalI, startIndex, totalLines.get(), partData.size());} else {lines.skip(startIndex).limit(endIndex - startIndex).forEach(str -> {operatePartData(str, partData);});System.out.printf("------------------ %s 起始位置:%s - %s, 数据量:%s%n", finalI, startIndex, endIndex, partData.size());}if (!partData.isEmpty()) {allDataList.addAll(partData);}} catch (IOException e) {throw new RuntimeException(e);} finally {countDownLatch.countDown();}});}try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}poolExecutor.shutdown();writeToFile(allDataList, outFileName);