1. 判空逻辑,如果为空,抛异常,下面代码来自kafka client:
Assert.notNull(queue, () -> "No cache found for " + txIdPrefix);
2. 本地cache设计,一下代码来自kafka client:
private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {if (txIdPrefix == null) {return null;}
// 下面这个方法是针对每个txIdPrefix,都创建一个LinkedBlockingQueue,并缓存起来,这里第二个参数用到了Supplier函数式接口return this.cache.computeIfAbsent(txIdPrefix, txId -> new LinkedBlockingQueue<>());}
3. kafka consumer单线程的控制逻辑CAS:
/*** Acquire the light lock and ensure that the consumer hasn't been closed.* @throws IllegalStateException If the consumer