需求背景:新增了ES,现在要讲数据库某张表的数据同步到ES中,百万级的数据量一次性读取同步肯定不行,所以可以用多线程同步执行同步数据。
1.线程池配置类
@Configuration
public class ThreadPoolConfig {/*** 核心线程池大小*/private static final int CORE_POOL_SIZE = 17;/*** 最大可创建的线程数*/private static final int MAX_POOL_SIZE = 50;/*** 队列最大长度*/private static final int QUEUE_CAPACITY = 1000;/*** 线程池维护线程所允许的空闲时间*/private static final int KEEP_ALIVE_SECONDS = 500;@Bean("taskExecutor")public ExecutorService executorService(){//使用原子类,保证线程命名的唯一性和连续性AtomicInteger c = new AtomicInteger(1);//创建链表结构的阻塞队列LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(QUEUE_CAPACITY);return new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_SECONDS,TimeUnit.MILLISECONDS,queue,r -> new Thread(r, "es-pool-" + c.getAndIncrement()),new ThreadPoolExecutor.DiscardPolicy());}
}
2.ES配置类
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {private String host;private int port;@Beanpublic RestHighLevelClient client(){return new RestHighLevelClient(RestClient.builder(new HttpHost(host,port,"http")));}
}
3.主要代码逻辑
@Service
@Transactional
@Slf4j
public class TestService{@Autowiredprivate TestMapper testMapper;@Autowiredprivate RestHighLevelClient client; //ES客户端@Autowiredprivate ExecutorService executorService; //线程池private static final String ARTICLE_ES_INDEX = "test_info";//ES索引库名称private static final int PAGE_SIZE = 5000; //每页记录数/*** 批量导入逻辑*/public void importAll() {//查询数据总数int count = testMapper.selectCount();//总页数用 数据库数据总数%每页记录数int totalPageSize = count % PAGE_SIZE == 0 ? count / PAGE_SIZE : count / PAGE_SIZE + 1;//记录开始执行时间long startTime = System.currentTimeMillis();//一共有多少页,就创建多少个CountDownLatch的计数CountDownLatch countDownLatch = new CountDownLatch(totalPageSize);int fromIndex;List<TestVo> testVoList= null;for (int i = 0; i < totalPageSize; i++) {//起始分页条数fromIndex = i * PAGE_SIZE;//查询数据库当前页数的数据 SELECT*FROM 表名 LIMIT fromIndex,PAGE_SIZEtestVoList= testMapper.selectCurrentData(fromIndex, PAGE_SIZE);//创建线程,做批量插入es数据操作TaskThread taskThread = new TaskThread(testVoList, countDownLatch);//把当前线程任务交由线程池执行executorService.execute(taskThread);}//调用await()方法,用来等待计数归零countDownLatch.await();long endTime = System.currentTimeMillis();log.info("es索引数据批量导入共:{}条,共消耗时间:{}秒", count, (endTime - startTime) / 1000);}//这里为了方便,写了线程内部类。class TaskThread implements Runnable {List<TestVo> testVoList;CountDownLatch cdl;//数据和倒计时锁public TaskThread(List<TestVo> testVoList, CountDownLatch cdl) {this.articleList = articleList;this.cdl = cdl;}@Overridepublic void run() {//创建ES对象,并指定名称BulkRequest bulkRequest = new BulkRequest(ARTICLE_ES_INDEX);for (SearchArticleVo searchArticleVo : articleList) {//存储到ESbulkRequest.add(new IndexRequest().id(searchArticleVo.getId().toString()).source(JSON.toJSONString(testVoList), XContentType.JSON));}//发送请求,批量添加数据到es索引库中client.bulk(bulkRequest, RequestOptions.DEFAULT);//添加成功后计数减一cdl.countDown();}}}