需求: 通过多线程 调用第三方的接口,处理数据,并得到返回值:
main方法测试:
package auto.thread;import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class TestThread {static int poolSize = 10;public static void main(String[] args) throws InterruptedException, ExecutionException {//程序计数器CountDownLatch countDownLatch = new CountDownLatch(poolSize);//线程池 ExecutorService executorService = Executors.newFixedThreadPool(poolSize);List<UserCamera> originList = new ArrayList(); //模拟生成 1000条数据for(int i=0; i <= 999; i++){UserCamera camera = new UserCamera();camera.setCameraCode(String.valueOf(i));camera.setCameraName("test Name "+String.valueOf(i) );camera.setAuthTime(new Date());originList.add(camera);}long ss = System.currentTimeMillis();//把数据 根据线程的个数 等分 一下List<List<UserCamera>> dataSets = new ArrayList();int batchSize = (int)Math.ceil((double)originList.size()/poolSize);System.out.println("batchSize="+batchSize);for(int i =0; i<originList.size(); i+=batchSize){int start =i;int end = Math.min(start + batchSize, originList.size());List<UserCamera> dataSet = originList.subList(start, end);dataSets.add(dataSet);}CopyOnWriteArrayList<UserCamera> rr = new CopyOnWriteArrayList<>(); //返回的结果方法 rr 中, 此处用 CopyOnWriteArrayList//在向线程池ThreadPoolExecutor提交任务时,一般为了方便操作采用execute提交任务,这时线程其实是无返回值的,//但是在生产中为了应对各种各样的需求,获取线程返回值是必不可少的,所以SDK提供另一种任务提交方式submit,方法签名如下for(List<UserCamera> dataSet : dataSets){DataProcessingTask task = new DataProcessingTask(dataSet, countDownLatch, rr);executorService.execute(task);}countDownLatch.await(); //等待所有的线程执行结束for(int i =0; i<rr.size(); i++){UserCamera aac = rr.get(i);System.out.println("结果:"+aac.getSignData());}executorService.shutdown();System.out.println("######总耗时:" + (System.currentTimeMillis() - ss));}
}
实体类
package auto.thread;import java.util.Date;public class UserCamera {/*** 摄像机编码*/private String cameraCode;/*** 用户编码*/private String userCode;/*** */private Date authTime;/*** 名称*/private String cameraName;/*** 结果*/private String signData;//省略get/set 方法}
处理逻辑
package auto.thread;import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;public class DataProcessingTask implements Runnable {private List<UserCamera> dataSet;private CopyOnWriteArrayList<UserCamera> resultList;private CountDownLatch countDownLatch;public DataProcessingTask(List<UserCamera> dataSet, CountDownLatch countDownLatch, CopyOnWriteArrayList<UserCamera> resultList) {this.dataSet = dataSet;this.countDownLatch = countDownLatch;this.resultList = resultList;}@Overridepublic void run() {try{for(int i =0; i < dataSet.size(); i++){//调用 数据处理逻辑UserCamera camrea = dataSet.get(i);String sign = sign(camrea.getCameraName());camrea.setSignData(sign);}resultList.addAll(dataSet);}catch(Exception e){System.out.println(e.getMessage());}finally {countDownLatch.countDown();}}/*** 此方法是处理数据的逻辑,具体可以调用三方接口* @param sign* @return*/private String sign(String sign){//long ss = System.currentTimeMillis();try {Random random = new Random();int i = 1+random.nextInt(30); Thread.sleep(i); // 暂停1-100随机毫秒System.out.println(i);} catch (InterruptedException e) {e.printStackTrace();}String ss = "--"+sign+"--测试--"+DateUtil.format(new Date(), DatePattern.PURE_DATETIME_MS_FORMAT);return ss;}
}
具体的线程也可以 使用springboot线程池:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configuration
@Slf4j
public class ThreadPoolConfig {@Value("${thread.pool.config.core.size:0}")private int coreSize;@Value("${thread.pool.config.max.size:0}")private int maxSize;@Value("${thread.pool.config.queue.capacity:0}")private int queueCapacity;// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数//private static final int COUR_SIZE = CPU_COUNT * 2;private static final int COUR_SIZE = CPU_COUNT;private static final int MAX_COUR_SIZE = CPU_COUNT * 4;// 接下来配置一个bean,配置线程池。@Beanpublic ThreadPoolTaskExecutor threadPoolTaskExecutor() {if(coreSize == 0){coreSize = COUR_SIZE;}if(maxSize == 0){maxSize = MAX_COUR_SIZE;}if(queueCapacity == 0){queueCapacity = MAX_COUR_SIZE;}ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(coreSize);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(maxSize);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(queueCapacity * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("sign-data-thread-");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略log.info("######线程池配置:coreSize:{}, maxSize:{},queueCapacity:{}", coreSize,maxSize, queueCapacity*4);return threadPoolTaskExecutor;}
}
测试时可以改变线程的多少 测试运行速度