springboot2.2.9整合kafka之KafkaListener实现原理

1、开启kafka的注解@EnableKafka

通过开启kafka注解可以看到Import的类KafkaListenerConfigurationSelector加载一个配置类KafkaBootstrapConfiguration,而此类中有两个重要的类:
KafkaListenerAnnotationBeanPostProcessor、KafkaListenerEndpointRegistry

2、KafkaListenerAnnotationBeanPostProcessor类的postProcessAfterInitialization方法

前置知识需要了解BeanPostProcessor的运行时机

	@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {//判断当前bean是否是没有注解的类if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);//获取类上面的注解(KafkaListener、KafkaListeners)Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);//是否存在类级别注解(习惯在方法上后面都是以方法级别介绍)final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<>();//收集类中所有方法上带有KafkaListener、KafkaListeners注解的方法(方法级别、方法级别、方法级别)Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});//是否存在类级别注解if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {//方法级别没有注解  把类添加了nonAnnotatedClasses集合this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {//进入processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}//类级别注解if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);//注解封装的endpointMethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();//方法endpoint.setMethod(methodToUse);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,Object bean, Object adminTarget, String beanName) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) {this.listenerScope.addListener(beanRef, bean);}//beanendpoint.setBean(bean);//KafkaHandlerMethodFactoryAdapter消息处理方法工厂endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);//KafkaListener注解设置的id,没有配置就是=>"org.springframework.kafka.KafkaListenerEndpointContainer#+原子自增的数值endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency = kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));}String autoStartup = kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));}resolveKafkaProperties(endpoint, kafkaListener.properties());endpoint.setSplitIterables(kafkaListener.splitIterables());//获取消费者监听工厂KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");if (StringUtils.hasText(errorHandlerBeanName)) {endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}//重点看这里 KafkaListenerEndpointRegistrarthis.registrar.registerEndpoint(endpoint, factory);if (StringUtils.hasText(beanRef)) {this.listenerScope.removeListener(beanRef);}}

KafkaListenerEndpointRegistrar#registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the container//endpoint和消费者工厂合并到一个类中KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { //false 是否立刻启动this.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {//主要走这this.endpointDescriptors.add(descriptor);}}}

3、KafkaListenerAnnotationBeanPostProcessor类的afterSingletonsInstantiated方法

前置知识需要了解SmartInitializingSingleton类的afterSingletonsInstantiated方法的运行时机===>
DefaultListableBeanFactory#preInstantiateSingletons方法最下面

	public void preInstantiateSingletons() throws BeansException {//....省部分代码// Trigger post-initialization callback for all applicable beans...for (String beanName : beanNames) {Object singletonInstance = getSingleton(beanName);if (singletonInstance instanceof SmartInitializingSingleton) {SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;if (System.getSecurityManager() != null) {AccessController.doPrivileged((PrivilegedAction<Object>) () -> {smartSingleton.afterSingletonsInstantiated();return null;}, getAccessControlContext());}else {//这里这里这里smartSingleton.afterSingletonsInstantiated();}}}}

KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

	public void afterSingletonsInstantiated() {this.registrar.setBeanFactory(this.beanFactory);if (this.beanFactory instanceof ListableBeanFactory) {//true 这段代码进入Map<String, KafkaListenerConfigurer> instances =((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);for (KafkaListenerConfigurer configurer : instances.values()) {//XXConfigurer的扩展修改registrar里面相关内容configurer.configureKafkaListeners(this.registrar);}}if (this.registrar.getEndpointRegistry() == null) {//true 这段代码进入if (this.endpointRegistry == null) {//第一点提到的KafkaListenerEndpointRegistry类this.endpointRegistry = this.beanFactory.getBean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,KafkaListenerEndpointRegistry.class);}this.registrar.setEndpointRegistry(this.endpointRegistry);}//默认工厂beanName=>"kafkaListenerContainerFactory"if (this.defaultContainerFactoryBeanName != null) {this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);}// Set the custom handler method factory once resolved by the configurer//消息处理方法工厂MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();if (handlerMethodFactory != null) {this.messageHandlerMethodFactory.setHandlerMethodFactory(handlerMethodFactory);}else {addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);}// Actually register all listeners//重点: 注册监听this.registrar.afterPropertiesSet();}

KafkaListenerEndpointRegistrar#afterPropertiesSet

	@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {//遍历前面收集的endpointDescriptors集合for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint&& this.validator != null) {((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);}//注册监听工厂this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}}

KafkaListenerEndpointRegistry#registerListenerContainer

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {//传入是否启动为falseregisterListenerContainer(endpoint, factory, false);}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {String id = endpoint.getId();synchronized (this.listenerContainers) {//创建监听容器MessageListenerContainer container = createListenerContainer(endpoint, factory);//放入集合this.listenerContainers.put(id, container);//有分组则注册bean(beanName=group, object=>List<MessageListenerContainer>)if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}//falseif (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,KafkaListenerContainerFactory<?> factory) {//创建监听容器MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {//falsetry {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if (listenerContainer.isAutoStartup() &&containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) {  // a custom phase valueif (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {throw new IllegalStateException("Encountered phase mismatch between container "+ "factory definitions: " + this.phase + " vs " + containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}

AbstractKafkaListenerContainerFactory#createListenerContainer

	public C createListenerContainer(KafkaListenerEndpoint endpoint) {//创建从前实例 进入看看C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}//属性复制endpoint.setupListenerContainer(instance, this.messageConverter);//调用子类的initializeContainerinitializeContainer(instance, endpoint);//扩展customizeContainer(instance);return instance;}//子类: ConcurrentKafkaListenerContainerFactory#initializeContainer@Overrideprotected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance,KafkaListenerEndpoint endpoint) {super.initializeContainer(instance, endpoint);//一个topic启动几个消费者(注意这里是几个消费者,很多项目配置很大再加上几个节点,就设置了很多无用的消费者取while(true)消耗cpu)if (endpoint.getConcurrency() != null) {instance.setConcurrency(endpoint.getConcurrency());}else if (this.concurrency != null) {instance.setConcurrency(this.concurrency);}}

ConcurrentKafkaListenerContainerFactory#createContainerInstance

	protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {//KafkaListener注解上配置内容//指定分区消费TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) {//指定topic消费  ContainerProperties 继承ConsumerPropertiesContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}}

4、KafkaListenerEndpointRegistry类的start方法

此类实现了SmartLifecycle接口关注start方法;
前置知识需要了解KafkaListenerEndpointRegistry类的start方法的运行时机AbstractApplicationContext#finishRefresh

	protected void finishRefresh() {// Clear context-level resource caches (such as ASM metadata from scanning).clearResourceCaches();// Initialize lifecycle processor for this context.initLifecycleProcessor();//这里这里这里// Propagate refresh to lifecycle processor first.//看这LifecycleProcessor本身就是LifeCycle接口的扩展  DefaultLifecycleProcessor 的 onRefreshgetLifecycleProcessor().onRefresh();// Publish the final event.publishEvent(new ContextRefreshedEvent(this));// Participate in LiveBeansView MBean, if active.LiveBeansView.registerApplicationContext(this);}
	public void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}this.running = true;}private void startIfNecessary(MessageListenerContainer listenerContainer) {if (this.contextRefreshed || listenerContainer.isAutoStartup()) {//listenerContainer.isAutoStartup = true//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#startlistenerContainer.start();}}

//进入ConcurrentMessageListenerContainer父类AbstractMessageListenerContainer#start

	public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");//子类ConcurrentMessageListenerContainer#doStartdoStart();}}}

ConcurrentMessageListenerContainer#doStart

protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.logger.warn(() -> "When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);for (int i = 0; i < this.concurrency; i++) {//一个topic启动多个消费者//构造KafkaMessageListenerContainerKafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);String beanName = getBeanName();container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);container.setApplicationContext(getApplicationContext());if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");container.setGenericErrorHandler(getGenericErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setInterceptBeforeTx(isInterceptBeforeTx());container.setEmergencyStop(() -> {stop(() -> {// NOSONAR});publishContainerStoppedEvent();});if (isPaused()) {container.pause();}//启动 AbstractMessageListenerContainer#startcontainer.start();this.containers.add(container);}}}

KafkaMessageListenerContainer#doStart类

	protected void doStart() {if (isRunning()) {return;}if (this.clientIdSuffix == null) { // stand-alone containercheckTopics();}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();//创建线程池if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);//ListenerConsumer对象是一个runnable实现类this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);//线程池执行任务this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);try {if (!this.startLatch.await(containerProperties.getConsumerStartTimeout().toMillis(), TimeUnit.MILLISECONDS)) {this.logger.error("Consumer thread failed to start - does the configured task executor "+ "have enough threads to support all containers and concurrency?");publishConsumerFailedToStart();}}catch (@SuppressWarnings(UNUSED) InterruptedException e) {Thread.currentThread().interrupt();}}

KafkaMessageListenerContainer.ListenerConsumer#ListenerConsumer

		ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {Properties consumerProperties = propertiesFromProperties();checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);this.autoCommit = determineAutoCommit(consumerProperties);//创建消费者this.consumer =KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId,this.containerProperties.getClientId(),KafkaMessageListenerContainer.this.clientIdSuffix,consumerProperties);this.clientId = determineClientId();this.transactionTemplate = determineTransactionTemplate();this.genericListener = listener;this.consumerSeekAwareListener = checkConsumerSeekAware(listener);this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());subscribeOrAssignTopics(this.consumer);GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();if (listener instanceof BatchMessageListener) {this.listener = null;this.batchListener = (BatchMessageListener<K, V>) listener;this.isBatchListener = true;this.wantsFullRecords = this.batchListener.wantsPollResult();}else if (listener instanceof MessageListener) {this.listener = (MessageListener<K, V>) listener;this.batchListener = null;this.isBatchListener = false;this.wantsFullRecords = false;}else {throw new IllegalArgumentException("Listener must be one of 'MessageListener', "+ "'BatchMessageListener', or the variants that are consumer aware and/or "+ "Acknowledging"+ " not " + listener.getClass().getName());}this.listenerType = listenerType;this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)|| listenerType.equals(ListenerType.CONSUMER_AWARE);if (this.isBatchListener) {validateErrorHandler(true);this.errorHandler = new LoggingErrorHandler();this.batchErrorHandler = determineBatchErrorHandler(errHandler);}else {validateErrorHandler(false);this.errorHandler = determineErrorHandler(errHandler);this.batchErrorHandler = new BatchLoggingErrorHandler();}Assert.state(!this.isBatchListener || !this.isRecordAck,"Cannot use AckMode.RECORD with a batch listener");if (this.containerProperties.getScheduler() != null) {this.taskScheduler = this.containerProperties.getScheduler();this.taskSchedulerExplicitlySet = true;}else {ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();threadPoolTaskScheduler.initialize();this.taskScheduler = threadPoolTaskScheduler;}this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer,Duration.ofSeconds(this.containerProperties.getMonitorInterval()));if (this.containerProperties.isLogContainerConfig()) {this.logger.info(this.toString());}Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false));this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true));this.syncCommitTimeout = determineSyncCommitTimeout();if (this.containerProperties.getSyncCommitTimeout() == null) {// update the property so we can use it directly from code elsewherethis.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {KafkaMessageListenerContainer.this.thisOrParentContainer.getContainerProperties().setSyncCommitTimeout(this.syncCommitTimeout);}}this.maxPollInterval = obtainMaxPollInterval(consumerProperties);this.micrometerHolder = obtainMicrometerHolder();this.deliveryAttemptAware = setupDeliveryAttemptAware();this.subBatchPerPartition = setupSubBatchPerPartition();}@Overridepublic void run() { // NOSONAR complexityListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable = null;while (isRunning()) {try {//拉取消息pollAndInvoke();}catch (@SuppressWarnings(UNUSED) WakeupException e) {// Ignore, we're stopping or applying immediate foreign acks}catch (NoOffsetForPartitionException nofpe) {this.fatalError = true;ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");exitThrowable = nofpe;break;}catch (AuthorizationException ae) {if (this.authorizationExceptionRetryInterval == null) {ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");this.fatalError = true;exitThrowable = ae;break;}else {ListenerConsumer.this.logger.error(ae, "Authorization Exception, retrying in " + this.authorizationExceptionRetryInterval.toMillis() + " ms");// We can't pause/resume here, as KafkaConsumer doesn't take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authorizationExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError = true;ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG+ "' has been fenced");exitThrowable = fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, "Stopping container due to fencing");stop(false);exitThrowable = e;}catch (Error e) { // NOSONAR - rethrownRunnable runnable = KafkaMessageListenerContainer.this.emergencyStop;if (runnable != null) {runnable.run();}this.logger.error(e, "Stopping container due to an Error");wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}}wrapUp(exitThrowable);}protected void pollAndInvoke() {if (!this.autoCommit && !this.isRecordAck) {processCommits();}fixTxOffsetsIfNeeded();idleBetweenPollIfNecessary();if (this.seeks.size() > 0) {processSeeks();}pauseConsumerIfNecessary();this.lastPoll = System.currentTimeMillis();if (!isRunning()) {return;}this.polling.set(true);//拉取消息ConsumerRecords<K, V> records = doPoll();if (!this.polling.compareAndSet(true, false) && records != null) {/** There is a small race condition where wakeIfNecessary was called between* exiting the poll and before we reset the boolean.*/if (records.count() > 0) {this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());}return;}resumeConsumerIfNeccessary();debugRecords(records);if (records != null && records.count() > 0) {savePositionsIfNeeded(records);notIdle();invokeListener(records);}else {checkIdle();}}private ConsumerRecords<K, V> doPoll() {ConsumerRecords<K, V> records;if (this.isBatchListener && this.subBatchPerPartition) {if (this.batchIterator == null) {this.lastBatch = this.consumer.poll(this.pollTimeout);if (this.lastBatch.count() == 0) {return this.lastBatch;}else {this.batchIterator = this.lastBatch.partitions().iterator();}}TopicPartition next = this.batchIterator.next();List<ConsumerRecord<K, V>> subBatch = this.lastBatch.records(next);records = new ConsumerRecords<>(Collections.singletonMap(next, subBatch));if (!this.batchIterator.hasNext()) {this.batchIterator = null;}}else {//拉取消息  基本apirecords = this.consumer.poll(this.pollTimeout);checkRebalanceCommits();}return records;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/650479.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣hot100 最小栈 变种栈

Problem: 155. 最小栈 文章目录 思路&#x1f496; Stack 自定义 Node&#x1f37b; Code 思路 &#x1f469;‍&#x1f3eb; 甜姨 &#x1f496; Stack 自定义 Node 时间复杂度: O ( 1 ) O(1) O(1) 空间复杂度: O ( n ) O(n) O(n) &#x1f37b; Code class MinS…

轻松打卡:使用Spring Boot和Redis Bitmap构建高效签到系统【redis实战 四】

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 轻松打卡&#xff1a;使用Spring Boot和Redis Bitmap构建高效签到系统【redis实战 四】 引言(redis实战)前言回顾bitmap基本概念核心特性使用场景 为什么使用redis中的bitmap实现&#xff1f;1. 存储效…

紫光展锐M6780丨超分辨率技术——画质重构还原经典

上一期&#xff0c;我们揭秘了让画质更加炫彩的AI-PQ技术。面对分辨率较低的老电影&#xff0c;光有高饱和度的色彩是不够的&#xff0c;如何能够提高视频影像的分辨率&#xff0c;使画质更加清晰&#xff0c;实现老片新看&#xff1f; 本期带大家揭晓紫光展锐首颗AI8K超高清智…

【分布式技术专题】「分布式技术架构」 探索Tomcat技术架构设计模式的奥秘(Server和Service组件原理分析)

探索Tomcat技术架构设计模式的奥秘 Tomcat系统架构分析Tomcat 整体结构Tomcat总体结构图以 Service 作为“婚姻”1) Service 接口方法列表 2) StandardService 的类结构图方法列表 3) StandardService. SetContainer4) StandardService. addConnector 以 Server 为“居”1) Ser…

etcd技术解析:构建高可用分布式系统的利器

1. 引言 随着云原生技术的兴起&#xff0c;分布式系统的构建变得愈发重要。etcd作为一个高可用的分布式键值存储系统&#xff0c;在这个领域发挥着至关重要的作用。本文将深入探讨etcd的技术细节&#xff0c;以及如何利用它构建高可用的分布式系统。 2. etcd简介 etcd是一个开…

通过Builder来构建集合list和map

之前已经写过一篇通用Builder来构建实例对象的&#xff08;基于Java8的新特性写一个通用的Builder工具类_java手动写一个builder-CSDN博客&#xff09;但是这个builder不能构建集合&#xff0c;也就是list和map&#xff0c;今天突然心血来潮&#xff0c;想写写集合类的builder&…

JVM系列-9.性能调优

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring原理、JUC原理、Kafka原理、分布式技术原理、数据库技术、JVM原理&#x1f525;如果感觉博主的文…

大数据安全 | 期末复习(中)

文章目录 &#x1f4da;感知数据安全⭐️&#x1f407;传感器概述&#x1f407;传感器的静态特性&#x1f407;调制方式&#x1f407;换能攻击&#x1f407;现有防护策略 &#x1f4da;AI安全⭐️&#x1f407;智能语音系统——脆弱性&#x1f407;攻击手段&#x1f407;AI的两…

探索IOC和DI:解密Spring框架中的依赖注入魔法

IOC与DI的详细解析 IOC详解1 bean的声明2 组件扫描 DI详解 IOC详解 1 bean的声明 IOC控制反转&#xff0c;就是将对象的控制权交给Spring的IOC容器&#xff0c;由IOC容器创建及管理对象。IOC容器创建的对象称为bean对象。 要把某个对象交给IOC容器管理&#xff0c;需要在类上…

【深度学习】sdxl中的 text_encoder text_encoder_2 区别

镜像问题是&#xff1a;https://editor.csdn.net/md/?articleId135867689 代码仓库&#xff1a; https://huggingface.co/stabilityai/stable-diffusion-xl-base-1.0/tree/main 截图&#xff1a; 为什么有两个CLIP编码器 text_encoder 和 text_encoder_2 &#xff1f; 在…

那些年与指针的爱恨情仇(一)---- 指针本质及其相关性质用法

关注小庄 顿顿解馋 (≧∇≦) 引言&#xff1a; 小伙伴们在学习c语言过程中是否因为指针而困扰&#xff0c;指针简直就像是小说女主&#xff0c;它逃咱追&#xff0c;我们插翅难飞…本篇文章让博主为你打理打理指针这个傲娇鬼吧~ 本节我们将认识到指针本质&#xff0c;何为指针和…

RHCE项目:使用LNMP搭建私有云存储

目录 一、准备工作 1、关闭防火墙、安全软件 2、搭建LNMP环境 3、上传软件 4、设置nextcloud安装命令权限 二、数据库 1、设置数据库 2、重启数据库 三、配置nginx 四、安装nextcloud 五、内网穿透 1、创建内网映射 2、linux系统安装花生壳客户端 3、重新打开浏览…

林浩然与极限的“无穷”约会

林浩然与极限的“无穷”约会 Lin Haoran’s Encounter with the Mathematical “Infinity” 在数学王国里&#xff0c;有一位名叫林浩然的大侠&#xff0c;他的江湖就是高等数学的殿堂。而他要挑战的终极Boss&#xff0c;便是那个既神秘又顽皮的“极限”。 In the kingdom of …

【爬虫用户代理和ip自动生成】

爬虫用户代理和ip自动生成 辛辛苦苦搬砖真辛苦啊 package com.glodon.gbes.utils;import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set;/*** 爬虫辅助工具类** author luochao* since 20240126*/ publi…

C# .Net6搭建灵活的RestApi服务器

1、准备 C# .Net6后支持顶级语句&#xff0c;更简单的RestApi服务支持&#xff0c;可以快速搭建一个极为简洁的Web系统。推荐使用Visual Studio 2022&#xff0c;安装"ASP.NET 和Web开发"组件。 2、创建工程 关键步骤如下&#xff1a; 包添加了“Newtonsoft.Json”&…

Mybatis之SqlSessionFactory详解

大家好&#xff0c;我是升仔 引言 在Mybatis中&#xff0c;SqlSessionFactory 是一个极其核心的组件&#xff0c;它负责创建 SqlSession&#xff0c;后者是进行数据库操作的主要接口。本文将深入探讨 SqlSessionFactory 的原理和应用。 SqlSessionFactory 概述 SqlSessionF…

【Git】项目管理笔记

文章目录 本地电脑初始化docker报错.gitignoregit loggit resetgit statusgit ls-filesgit rm -r -f --cached拉取仓库文件更新本地的项目报错处理! [rejected] master -> master (fetch first)gitgitee.com: Permission denied (publickey).error: remote origin already e…

BACnet转IEC104网关BE104

随着电力系统信息化建设和数字化转型的进程不断加速&#xff0c;对电力能源的智能化需求也日趋增强。健全稳定的智慧电力系统能够为工业生产、基础设施建设以及国防建设提供稳定的能源支持。在此背景下&#xff0c;高性能的工业电力数据传输解决方案——协议转换网关应运而生&a…

研发日记,Matlab/Simulink避坑指南(六)——字节分割Bug

文章目录 前言 背景介绍 问题描述 分析排查 解决方案 总结归纳 前言 见《研发日记&#xff0c;Matlab/Simulink避坑指南&#xff08;一&#xff09;——Data Store Memory模块执行时序Bug》 见《研发日记&#xff0c;Matlab/Simulink避坑指南(二)——非对称数据溢出Bug》…

Arduino开发实例-DRV8833电机驱动器控制直流电机

DRV8833电机驱动器控制直流电机 文章目录 DRV8833电机驱动器控制直流电机1、DRV8833电机驱动器介绍2、硬件接线图3、代码实现DRV8833 使用 MOSFET,而不是 BJT。 MOSFET 的压降几乎可以忽略不计,这意味着几乎所有来自电源的电压都会传递到电机。 这就是为什么 DRV8833 不仅比基…