业务场景:
批量消费Kafka数据,一个批次针对同一条数据做新增和修改操作,当前一条数据还未更新完成,后一条数据也同步修改,数据就存在异常;
单机模式:
(一)、直接拼接唯一的条件作为字符串,判断两条数据是一条数据的条件,把它放入常量池(intern方法),之后对常量池字符串加锁,保证同一条数据只会一个一个修改;
单机/集群模式
(一)、使用Redis中间件加锁,同一条数据先阻塞,但这种方式的Redis锁也需要加在判断两条数据属于同一条数据的查询条件上;
(二)、发送消息到同一个topic,使用线程池数组队列阻塞同一条修改的数据
/*** 新增Lindorm阻塞任务线程池** @return*/@Beanpublic ExecutorService[] lindormExecutorService() {Logger logger = LoggerFactory.getLogger(GpsServiceApiController.class);ExecutorService[] executorServices = new ExecutorService[8];for (int i = 0; i < 8; i++) {executorServices[i] = createExecutorService(logger, 1, 10, i, "deleteLindorm");}return executorServices;}private ExecutorService createExecutorService(Logger logger, int threadMaxPoolSize, int threadQueueCapacitySize, int i, String name) {return new ThreadPoolExecutor(threadMaxPoolSize, threadMaxPoolSize,0L, TimeUnit.MILLISECONDS,//如果使用优先级队列(PriorityBlockingQueue),需要实现Compare比较方法new LinkedBlockingQueue<>(threadQueueCapacitySize),new ThreadFactoryBuilder().setNameFormat("pool-" + i + "-" + name + "-thread-%d").build(),(r, executor) -> {try {long startTime = System.currentTimeMillis();executor.getQueue().put(r);logger.info("too many task to handle,blocking the task {} ms", System.currentTimeMillis() - startTime);} catch (InterruptedException e) {e.printStackTrace();}});}
lindormExecutorService[Hashing.consistentHash(查询条件字符串.hashCode(), lindormExecutorService.length)].execute(() -> {try {} catch (Exception e) {log.warn("xxxxx:{}", param, e);}});
这样也能保证同一条数据被阻塞;