1、开启kafka的注解@EnableKafka
通过开启kafka注解可以看到Import的类KafkaListenerConfigurationSelector加载一个配置类KafkaBootstrapConfiguration,而此类中有两个重要的类:
KafkaListenerAnnotationBeanPostProcessor、KafkaListenerEndpointRegistry
2、KafkaListenerAnnotationBeanPostProcessor类的postProcessAfterInitialization方法
前置知识需要了解BeanPostProcessor的运行时机
@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {//判断当前bean是否是没有注解的类if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);//获取类上面的注解(KafkaListener、KafkaListeners)Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);//是否存在类级别注解(习惯在方法上后面都是以方法级别介绍)final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<>();//收集类中所有方法上带有KafkaListener、KafkaListeners注解的方法(方法级别、方法级别、方法级别)Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});//是否存在类级别注解if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {//方法级别没有注解 把类添加了nonAnnotatedClasses集合this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {//进入processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}//类级别注解if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);//注解封装的endpointMethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();//方法endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}//beanendpoint.setBean(bean);//KafkaHandlerMethodFactoryAdapter消息处理方法工厂endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);//KafkaListener注解设置的id,没有配置就是=>"org.springframework.kafka.KafkaListenerEndpointContainer#+原子自增的数值endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));}resolveKafkaProperties(endpoint, kafkaListener.properties());endpoint.setSplitIterables(kafkaListener.splitIterables());//获取消费者监听工厂KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}//重点看这里 KafkaListenerEndpointRegistrarthis.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}
KafkaListenerEndpointRegistrar#registerEndpoint
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the container//endpoint和消费者工厂合并到一个类中KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { //false 是否立刻启动this.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {//主要走这this.endpointDescriptors.add(descriptor);}}}
3、KafkaListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated方法
前置知识需要了解SmartInitializingSingleton类的afterSingletonsInstantiated方法的运行时机===>
DefaultListableBeanFactory#preInstantiateSingletons方法最下面
public void preInstantiateSingletons() throws BeansException {//....省部分代码// Trigger post-initialization callback for all applicable beans...for (String beanName : beanNames) {Object singletonInstance = getSingleton(beanName);if (singletonInstance instanceof SmartInitializingSingleton) {SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;if (System.getSecurityManager() != null) {AccessController.doPrivileged((PrivilegedAction<Object>) () -> {smartSingleton.afterSingletonsInstantiated();return null;}, getAccessControlContext());}else {//这里这里这里smartSingleton.afterSingletonsInstantiated();}}}}
KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated
public void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {//true 这段代码进入Map<String, KafkaListenerConfigurer> instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {//XXConfigurer的扩展修改registrar里面相关内容configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {//true 这段代码进入if (this.endpointRegistry == null) {//第一点提到的KafkaListenerEndpointRegistry类this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}//默认工厂beanName=>"kafkaListenerContainerFactory"if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurer//消息处理方法工厂MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listeners//重点: 注册监听this.registrar.afterPropertiesSet();}
KafkaListenerEndpointRegistrar#afterPropertiesSet
@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {//遍历前面收集的endpointDescriptors集合for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint&& this.validator != null) {((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);}//注册监听工厂this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true; // trigger immediate startup}}
KafkaListenerEndpointRegistry#registerListenerContainer
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {//传入是否启动为falseregisterListenerContainer(endpoint, factory, false);}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {String id = endpoint.getId();synchronized (this.listenerContainers) {//创建监听容器MessageListenerContainer container = createListenerContainer(endpoint, factory);//放入集合this.listenerContainers.put(id, container);//有分组则注册bean(beanName=group, object=>List<MessageListenerContainer>)if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}//falseif (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,KafkaListenerContainerFactory<?> factory) {//创建监听容器MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {//falsetry {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if (listenerContainer.isAutoStartup() &&containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) { // a custom phase valueif (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {throw new IllegalStateException("Encountered phase mismatch between container "+ "factory definitions: " + this.phase + " vs " + containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}
AbstractKafkaListenerContainerFactory#createListenerContainer
public C createListenerContainer(KafkaListenerEndpoint endpoint) {//创建从前实例 进入看看C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}//属性复制endpoint.setupListenerContainer(instance, this.messageConverter);//调用子类的initializeContainerinitializeContainer(instance, endpoint);//扩展customizeContainer(instance);return instance;}//子类: ConcurrentKafkaListenerContainerFactory#initializeContainer@Overrideprotected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance,KafkaListenerEndpoint endpoint) {super.initializeContainer(instance, endpoint);//一个topic启动几个消费者(注意这里是几个消费者,很多项目配置很大再加上几个节点,就设置了很多无用的消费者取while(true)消耗cpu)if (endpoint.getConcurrency() != null) {instance.setConcurrency(endpoint.getConcurrency());}else if (this.concurrency != null) {instance.setConcurrency(this.concurrency);}}
ConcurrentKafkaListenerContainerFactory#createContainerInstance
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {//KafkaListener注解上配置内容//指定分区消费TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) {//指定topic消费 ContainerProperties 继承ConsumerPropertiesContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}}
4、KafkaListenerEndpointRegistry类的start方法
此类实现了SmartLifecycle接口关注start方法;
前置知识需要了解KafkaListenerEndpointRegistry类的start方法的运行时机AbstractApplicationContext#finishRefresh
protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();//这里这里这里// Propagate refresh to lifecycle processor first.//看这LifecycleProcessor本身就是LifeCycle接口的扩展 DefaultLifecycleProcessor 的 onRefreshgetLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}
public void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}this.running = true;}private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {//listenerContainer.isAutoStartup = true//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#startlistenerContainer.start();}}
//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#start
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");//子类ConcurrentMessageListenerContainer#doStartdoStart();}}}
ConcurrentMessageListenerContainer#doStart
protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);for (int i = 0; i < this.concurrency; i++) {//一个topic启动多个消费者//构造KafkaMessageListenerContainerKafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);container.setApplicationContext(getApplicationContext());if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");container.setGenericErrorHandler(getGenericErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setInterceptBeforeTx(isInterceptBeforeTx());container.setEmergencyStop(() -> {stop(() -> {// NOSONAR});publishContainerStoppedEvent();});if (isPaused()) {container.pause();}//启动 AbstractMessageListenerContainer#startcontainer.start();this.containers.add(container);}}}
KafkaMessageListenerContainer#doStart类
protected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix == null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();//创建线程池if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);//ListenerConsumer对象是一个runnable实现类this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);//线程池执行任务this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);try {if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {this.logger.error("Consumer thread failed to start - does the configured task executor "+ "have enough threads to support all containers and concurrency?");publishConsumerFailedToStart();}}catch (@SuppressWarnings(UNUSED) InterruptedException e) {Thread.currentThread().interrupt();}}
KafkaMessageListenerContainer.ListenerConsumer#ListenerConsumer
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {Properties consumerProperties = propertiesFromProperties();checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);this.autoCommit = determineAutoCommit(consumerProperties);//创建消费者this.consumer =KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId,this.containerProperties.getClientId(),KafkaMessageListenerContainer.this.clientIdSuffix,consumerProperties);this.clientId = determineClientId();this.transactionTemplate = determineTransactionTemplate();this.genericListener = listener;this.consumerSeekAwareListener = checkConsumerSeekAware(listener);this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());subscribeOrAssignTopics(this.consumer);GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();if (listener instanceof BatchMessageListener) {this.listener = null;this.batchListener = (BatchMessageListener<K, V>) listener;this.isBatchListener = true;this.wantsFullRecords = this.batchListener.wantsPollResult();}else if (listener instanceof MessageListener) {this.listener = (MessageListener<K, V>) listener;this.batchListener = null;this.isBatchListener = false;this.wantsFullRecords = false;}else {throw new IllegalArgumentException("Listener must be one of 'MessageListener', "+ "'BatchMessageListener', or the variants that are consumer aware and/or "+ "Acknowledging"+ " not " + listener.getClass().getName());}this.listenerType = listenerType;this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)|| listenerType.equals(ListenerType.CONSUMER_AWARE);if (this.isBatchListener) {validateErrorHandler(true);this.errorHandler = new LoggingErrorHandler();this.batchErrorHandler = determineBatchErrorHandler(errHandler);}else {validateErrorHandler(false);this.errorHandler = determineErrorHandler(errHandler);this.batchErrorHandler = new BatchLoggingErrorHandler();}Assert.state(!this.isBatchListener || !this.isRecordAck,"Cannot use AckMode.RECORD with a batch listener");if (this.containerProperties.getScheduler() != null) {this.taskScheduler = this.containerProperties.getScheduler();this.taskSchedulerExplicitlySet = true;}else {ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();threadPoolTaskScheduler.initialize();this.taskScheduler = threadPoolTaskScheduler;}this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer,Duration.ofSeconds(this.containerProperties.getMonitorInterval()));if (this.containerProperties.isLogContainerConfig()) {this.logger.info(this.toString());}Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false));this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true));this.syncCommitTimeout = determineSyncCommitTimeout();if (this.containerProperties.getSyncCommitTimeout() == null) {// update the property so we can use it directly from code elsewherethis.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {KafkaMessageListenerContainer.this.thisOrParentContainer.getContainerProperties().setSyncCommitTimeout(this.syncCommitTimeout);}}this.maxPollInterval = obtainMaxPollInterval(consumerProperties);this.micrometerHolder = obtainMicrometerHolder();this.deliveryAttemptAware = setupDeliveryAttemptAware();this.subBatchPerPartition = setupSubBatchPerPartition();}@Overridepublic void run() { // NOSONAR complexityListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable = null;while (isRunning()) {try {//拉取消息pollAndInvoke();}catch (@SuppressWarnings(UNUSED) WakeupException e) {// Ignore, we're stopping or applying immediate foreign acks}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");exitThrowable = nofpe;break;}catch (AuthorizationException ae) {if (this.authorizationExceptionRetryInterval == null) {ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");this.fatalError = true;exitThrowable = ae;break;}else {ListenerConsumer.this.logger.error(ae, "Authorization Exception, retrying in " + this.authorizationExceptionRetryInterval.toMillis() + " ms");// We can't pause/resume here, as KafkaConsumer doesn't take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authorizationExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError = true;ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG+ "' has been fenced");exitThrowable = fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, "Stopping container due to fencing");stop(false);exitThrowable = e;}catch (Error e) { // NOSONAR - rethrownRunnable runnable = KafkaMessageListenerContainer.this.emergencyStop;if (runnable != null) {runnable.run();}this.logger.error(e, "Stopping container due to an Error");wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}}wrapUp(exitThrowable);}protected void pollAndInvoke() {if (!this.autoCommit && !this.isRecordAck) {processCommits();}fixTxOffsetsIfNeeded();idleBetweenPollIfNecessary();if (this.seeks.size() > 0) {processSeeks();}pauseConsumerIfNecessary();this.lastPoll = System.currentTimeMillis();if (!isRunning()) {return;}this.polling.set(true);//拉取消息ConsumerRecords<K, V> records = doPoll();if (!this.polling.compareAndSet(true, false) && records != null) {/** There is a small race condition where wakeIfNecessary was called between* exiting the poll and before we reset the boolean.*/if (records.count() > 0) {this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());}return;}resumeConsumerIfNeccessary();debugRecords(records);if (records != null && records.count() > 0) {savePositionsIfNeeded(records);notIdle();invokeListener(records);}else {checkIdle();}}private ConsumerRecords<K, V> doPoll() {ConsumerRecords<K, V> records;if (this.isBatchListener && this.subBatchPerPartition) {if (this.batchIterator == null) {this.lastBatch = this.consumer.poll(this.pollTimeout);if (this.lastBatch.count() == 0) {return this.lastBatch;}else {this.batchIterator = this.lastBatch.partitions().iterator();}}TopicPartition next = this.batchIterator.next();List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));if (!this.batchIterator.hasNext()) {this.batchIterator = null;}}else {//拉取消息 基本apirecords = this.consumer.poll(this.pollTimeout);checkRebalanceCommits();}return records;}