大家开发中会遇到这样一种场景,从某个数据库中查出大量的数据,对这些数据进行某种处理后,存储到另一个数据库中。如果只是简简单单的“流水账”代码,就会耗时很久。这时候就需要采用一些方法来提升效率。
处理大量数据时,为了提高效率,一般可以采取以下几个步骤:
-
分批处理:将大数据集分成较小的数据块,每次只处理一部分数据,这样可以减少内存压力,并且可以利用多线程或多进程来加速处理过程。
-
使用连接池:通过连接池管理数据库连接,避免频繁地创建和关闭连接,从而节省资源和时间。
-
使用索引:确保查询表的关键字段有合适的索引,这有助于加快查询速度。
-
缓存结果:如果可能的话,尝试将查询结果缓存起来,以便下次查询时直接返回缓存结果,避免重复查询。
-
优化SQL语句:尽可能使用高效的SQL语句,比如避免使用子查询,尽量使用JOIN操作等。
-
使用存储过程:如果数据库支持,可以考虑使用存储过程来执行复杂的查询和处理逻辑,因为存储过程在执行时会被预编译,所以可以提高性能。
-
使用流式处理:如果数据量非常大,可以考虑使用流式处理技术,比如Apache Flink或Apache Spark,这些工具可以有效地处理无限序列的数据流。
-
使用异步任务:对于耗时的操作,比如文件读取或网络请求,可以使用异步任务来避免阻塞主线程,提高整体响应速度。
-
监控系统资源:实时监控CPU、内存、磁盘空间等系统资源的使用情况,及时调整资源分配策略,保证系统的稳定运行。
-
定期维护数据库:定期清理无用数据,优化数据库结构,更新索引等,以确保数据库始终处于最佳状态。
-
批量插入数据: 考虑使用数据库的批量插入功能,如批量插入多行数据,以减少数据库交互次数。
以上就是一些常见的提升大数据处理效率的方法,具体实施时应根据实际情况选择合适的技术手段。
示例:
下面是其中一种提效的方法示例。以将从student_info_data数据库中获取的学生信息进行加工处理,存储到另一个数据库student_info_handled_data中。代码中使用了分页查询、批量插入、多线程和数据分片等多种方法进行效率的提升。
public Boolean queryStudentInfo4Save() {long start = System.currentTimeMillis();log.info("开始处理,start={}");//从第0条开始,每次查询5000条int PAGE_SIZE = 5000;int offSet = 0;StudentInfoQueryParam studentInfoQueryParam = new StudentInfoQueryParam();studentInfoQueryParam.setPageSize(PAGE_SIZE);while (true) {studentInfoQueryParam.setOffSet(offSet);//批量查询学生信息List<StudentInfo> studentInfoList = this.listStudentInfo(studentInfoQueryParam);//未查到则终止循环if (CollectionUtils.isEmpty(studentInfoList)) {break;}// 收集做处理后的学生信息List<StudentInfo> allList = Collections.synchronizedList(new ArrayList<>());try {// 按一页500条数据进行分片List<List<StudentInfo>> partList = Lists.partition(studentInfoList, 50);//使用多线程CountDownLatch countDownLatch = new CountDownLatch(partList.size());// 一次保存100条数据partList.forEach(pageList -> {executor.execute(() -> {try {// 处理加工学生信息List<StudentInfo> StudentInfoHandledList = handleStudentInfoList(pageList);if (CollectionUtils.isNotEmpty(StudentInfoHandledList)){allList.addAll(StudentInfoHandledList);}}catch (Exception e){log.error("处理学生信息异常", e);}finally {countDownLatch.countDown();}});});countDownLatch.await();} catch (InterruptedException e) {log.error("多线程处理学生信息异常,当前执行情况:{}", e);return null;}if (CollectionUtils.isNotEmpty(allList)){//批量插入处理后的学生信息studentInfoHandledMapper.batchInsert(allList);}//如果最后一次查询查询学生信息小于设置的每页数据,则代表已经查询完毕,终止循环if (studentInfoList.size() < PAGE_SIZE) {log.info("最后一次查询查询学生信息小于设置的每页数据量);break;}offSet += PAGE_SIZE;}log.info("l处理结束,耗时={}",System.currentTimeMillis() - start);return Boolean.TRUE;}public List<StudentInfo> listStudentInfo(StudentInfoQueryParam studentInfoQueryParam) {List<StudentInfo> list = studentMapper.listStudentInfo(studentInfoQueryParam);if (CollectionUtils.isEmpty(list)) {return null;}return list;}
sql伪代码以在springboot中使用mybatis generate自动生成mapper中的自定义方法为例:
分页查询:
<select id="listStudentInfo" parameterType="studentInfoQueryParam"resultType="StudentInfo">SELECT<include refid="Base_Column_List" />FROM student_info_datalimit #{offSet}, #{pageSize}</select>
批量插入:
<insert id="batchInsert" parameterType="StudentInfo">insert student_info_handled_data (id, name, age, class, sex)values<foreach collection ="studentInfoList" item="item" index= "index" separator =",">(#{item.id,jdbcType=BIGINT}, #{item.name,jdbcType=VARCHAR}, #{item.age,jdbcType=INTEGER},#{item.class,jdbcType=VARCHAR}, #{item.sex,jdbcType=INTEGER})</foreach >
</insert>
大家可以使用System.currentTimeMillis()去测试不同方法的效率。