docker-compose部署kafka

docker-compose.yml配置

version: "3"
services:kafka:image: 'bitnami/kafka:latest'ports:- '7050:7050'environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:7050,CONTROLLER://:7051- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://183.56.203.157:7050- KAFKA_BROKER_ID=1- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@0.0.0.0:7051- ALLOW_PLAINTEXT_LISTENER=yes

kafka UI界面

docker run -d --name kafka-map -p 8049:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin dushixiang/kafka-map:latest

docker run -p 8080:8080 -e KAFKA_BROKERS=host.docker.internal:9092 docker.redpanda.com/vectorized/console:master-173596f

UI界面总览

https://towardsdatascience.com/overview-of-ui-tools-for-monitoring-and-management-of-apache-kafka-clusters-8c383f897e80

kafka学习

生产者
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.Test
import java.util.*/*** @Description :* @Author  xiaomh* @date  2022/8/5 15:58*/
class CustomProducer {//异步发送@Testfun customProducer() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 5) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("xiao1", "customProducer,count::$i"))}//关闭资源kafkaProducer.close()}//同步发送@Testfun customProducerSync() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 5) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("xiao1", "customProducerSync,count::$i")).get()}//关闭资源kafkaProducer.close()}//回调异步发送@Testfun customProducerCallback() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 500) {//黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)kafkaProducer.send(ProducerRecord("xiao1", "customProducerCallback,count::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})//测试分区策略Thread.sleep(1)}//关闭资源kafkaProducer.close()}//回调异步发送+使用分区@Testfun customProducerCallbackPartitions1() {//配置val properties = Properties()//链接kafkaproperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 5) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", 1, "", "customProducerCallbackPartitions,count::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//回调异步发送+自定义分区@Testfun customProducerCallbackPartitions2() {//配置val properties = Properties()//链接kafka,集群链接使用"183.56.203.157:7050,183.56.203.157:7051"properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//关联自定义分区器properties[ProducerConfig.PARTITIONER_CLASS_CONFIG] ="com.umh.medicalbookingplatform.b2bapi.config.MyPartitioner"//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//发送数据for (i in 0 until 50) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "felix is strong,count::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//自定义配置缓冲区、批次、等待时间、压缩@Testfun customProducerParameters() {//配置val properties = Properties()properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//缓冲区大小。默认32,64=33554432x2properties[ProducerConfig.BUFFER_MEMORY_CONFIG] = 33554432//批次大小。默认16kproperties[ProducerConfig.BATCH_SIZE_CONFIG] = 16384//等待时间。默认0properties[ProducerConfig.LINGER_MS_CONFIG] = 1//压缩.压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstdproperties[ProducerConfig.COMPRESSION_TYPE_CONFIG] = "snappy"//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)for (i in 0 until 10) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "customProducerParameters::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//ack、重试次数配置@Testfun customProducerAck() {//配置val properties = Properties()properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//ackproperties[ProducerConfig.ACKS_CONFIG] = "1"//重试次数properties[ProducerConfig.RETRIES_CONFIG] = 30//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)for (i in 0 until 10) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "customProducerAck::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}//关闭资源kafkaProducer.close()}//事物@Testfun customProducerTransaction() {//配置val properties = Properties()properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.nameproperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name//指定事务id,一定要指定!!properties[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = UUID.randomUUID().toString()//创建kafka生产者对象val kafkaProducer = KafkaProducer<String, String>(properties)//开启事务kafkaProducer.initTransactions()kafkaProducer.beginTransaction()try {for (i in 0 until 10) {//1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值//2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)//key可以作为producer数据名,让consumer通过key找到kafkaProducer.send(ProducerRecord("xiao1", "customProducerTransaction::$i"), Callback{ metadata, exception ->if (exception == null) {println("主题:${metadata.topic()},分区:${metadata.partition()}")}})}
//            val test: Int = 1 / 0kafkaProducer.commitTransaction()} catch (e: Exception) {kafkaProducer.abortTransaction()} finally {//关闭资源kafkaProducer.close()}}}

消费者

1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据。

2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。

3、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的过长(max.poll.interval.ms5分钟),也会触发再 平衡

package com.umh.medicalbookingplatform.apiimport com.alibaba.fastjson.parser.ParserConfig
import com.fasterxml.jackson.databind.MapperFeature
import com.umh.medicalbookingplatform.core.audit.SpringSecurityAuditorAware
import com.umh.medicalbookingplatform.core.config.CoreConfiguration
import com.umh.medicalbookingplatform.core.jsonview.JsonViews
import com.umh.medicalbookingplatform.core.properties.ApplicationProperties
import com.umh.medicalbookingplatform.core.utils.ApplicationJsonObjectMapper
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder
import org.keycloak.OAuth2Constants
import org.keycloak.admin.client.Keycloak
import org.keycloak.admin.client.KeycloakBuilder
import io.swagger.v3.oas.models.Components
import io.swagger.v3.oas.models.OpenAPI
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.boot.web.servlet.ServletComponentScan
import org.springframework.cache.annotation.EnableCaching
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.data.domain.AuditorAware
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
import org.springframework.http.MediaType
import org.springframework.http.converter.HttpMessageConverter
import org.springframework.http.converter.ResourceHttpMessageConverter
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
import java.security.Security
import java.util.*
import io.swagger.v3.oas.models.info.Info
import io.swagger.v3.oas.models.info.License
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.keycloak.adapters.KeycloakConfigResolver
import org.keycloak.adapters.springboot.KeycloakSpringBootConfigResolver
import org.keycloak.adapters.springboot.KeycloakSpringBootProperties
import org.springframework.http.converter.StringHttpMessageConverter
import java.time.Duration
import java.util.concurrent.TimeUnit@EnableJpaAuditing
@EnableCaching
@EnableScheduling
@SpringBootApplication
@Import(CoreConfiguration::class)
@ServletComponentScan("com.umh.medicalbookingplatform")
open class ApiApplication : WebMvcConfigurer {@Autowiredprivate lateinit var appProperties: ApplicationProperties@Autowiredprivate lateinit var keycloakSpringBootProperties: KeycloakSpringBootProperties@Beanfun keycloakConfigResolver(): KeycloakConfigResolver {return KeycloakSpringBootConfigResolver()}@Beanfun fastJson(){ParserConfig.getGlobalInstance().isAutoTypeSupport = true}@Beanfun customConsumer() {//配置val properties = Properties()//连接properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//反序列化(注意写法:生产者是序列化,消费者是反序列化)properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.nameproperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name//配置消费者组id(就算消费者组只有一个消费者也需要)//当消费者组ID相同时,表示他们在同一个消费者组//当有三个分区,而消费者组里又有三个消费者时,消费者会各自自动选取一个分区进行消费properties[ConsumerConfig.GROUP_ID_CONFIG] = "test"//1.创建一个消费者val kafkaConsumer = KafkaConsumer<String, String>(properties)//2.定义主题 xiao1val topics = mutableListOf<String>()topics.add("xiao1")kafkaConsumer.subscribe(topics)//3.消费数据while (true) {val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))for (msg in consumerRecord) {println("consumer,msg:::$msg")}}}//    @Beanfun customConsumerPartition() {//配置val properties = Properties()//连接properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//反序列化(注意写法:生产者是序列化,消费者是反序列化)properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.nameproperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name//配置消费者组id(就算消费者组只有一个消费者也需要)//当消费者组ID相同时,表示他们在同一个消费者组properties[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()//1.创建一个消费者val kafkaConsumer = KafkaConsumer<String, String>(properties)//2.定义主题对应的分区val topicPartition = mutableListOf<TopicPartition>()topicPartition.add(TopicPartition("xiao1", 1))kafkaConsumer.assign(topicPartition)//3.消费数据while (true) {val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))for (msg in consumerRecord) {println("msg:::$msg")}}}@Bean(name = ["keycloakGlobalCmsApi"])fun keycloakGlobalCmsApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl)//https://keycloak.umhgp.com/auth.realm(appProperties.keycloakGlobalCmsRealm)//global_cms.clientId(appProperties.keycloakGlobalCmsClient)//global-cms.username(appProperties.keycloakApiUsername)//medical-booking-platform-system-uat.password(appProperties.keycloakApiPassword)//Kas7aAnC76eGVHv5.grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Bean(name = ["keycloakGlobalProfileApi"])fun keycloakGlobalProfileApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl).realm(appProperties.keycloakGlobalProfileRealm).clientId(appProperties.keycloakGlobalProfileClient).username(appProperties.keycloakApiUsername).password(appProperties.keycloakApiPassword).grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Bean(name = ["keycloakBookingSystemApi"])fun keycloakBookingSystemApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl).realm(appProperties.keycloakBookingSystemRealm).clientId(appProperties.keycloakBookingSystemClient).username(appProperties.keycloakApiUsername).password(appProperties.keycloakApiPassword).grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Bean(name = ["keycloakUmhBookingSystemApi"])fun keycloakBookingSystemUmhApiInstance(): Keycloak {return KeycloakBuilder.builder().serverUrl(appProperties.keycloakAuthServerUrl).realm(appProperties.keycloakUmhBookingSystemRealm).clientId(appProperties.keycloakUmhBookingSystemClient).username(appProperties.keycloakApiUsername).password(appProperties.keycloakApiPassword).grantType(OAuth2Constants.PASSWORD).resteasyClient(ResteasyClientBuilder().connectTimeout(10, TimeUnit.SECONDS).readTimeout(10, TimeUnit.SECONDS).connectionPoolSize(100).build()).build()}@Beaninternal fun auditorProvider(): AuditorAware<UUID> {return SpringSecurityAuditorAware()}@Beanfun customOpenAPI(): OpenAPI? {return OpenAPI().components(Components()).info(Info().title("medical-booking-platform").version("1.5.8").license(License().name("Apache 2.0").url("http://springdoc.org")))}override fun configureMessageConverters(converters: MutableList<HttpMessageConverter<*>>) {
//        ActuatorMediaTypes()val supportedMediaTypes = ArrayList<MediaType>()supportedMediaTypes.add(MediaType.APPLICATION_JSON)supportedMediaTypes.add(MediaType.valueOf("application/vnd.spring-boot.actuator.v3+json"))supportedMediaTypes.add(MediaType.TEXT_PLAIN)val converter = MappingJackson2HttpMessageConverter()val objectMapper = ApplicationJsonObjectMapper()objectMapper.setConfig(objectMapper.serializationConfig.withView(JsonViews.Admin::class.java))objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, true)converter.objectMapper = objectMapperconverter.setPrettyPrint(true)converter.supportedMediaTypes = supportedMediaTypesconverters.add(0, StringHttpMessageConverter())converters.add(1, converter)converters.add(ResourceHttpMessageConverter())}}fun main(args: Array<String>) {Security.setProperty("crypto.policy", "unlimited")runApplication<ApiApplication>(*args)
}

range(范围)

Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策 略。

消费者分区操作:7分区2个消费者时

消费者1:消费分区0123

消费者2:消费分区456


在同一个消费者组,三消费者的情况下,如果其中一个宕机,45秒后会把消费者0需要处理的数据整个搬到消费者1或者消费者2.

结果:Consumer1=01234 或者 Consumer2=01256

随后如果再传输数据,消费者组会根据当前的消费者重新组织分配

Consumer0宕机45秒后再次传数据结果:Consumer1=0123 Consumer2=456

RoundRobin(轮询)

RoundRobin 针对集群中所有Topic而言。 RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

策略分配的修改

    @Beanfun customConsumer() {//配置val properties = Properties()//连接properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"//反序列化(注意写法:生产者是序列化,消费者是反序列化)properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.nameproperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name//配置消费者组id(就算消费者组只有一个消费者也需要)//当消费者组ID相同时,表示他们在同一个消费者组//当有三个分区,而消费者组里又有三个消费者时,消费者会各自自动选取一个分区进行消费properties[ConsumerConfig.GROUP_ID_CONFIG] = "test"//设置分区分配策略properties[ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG] = "org.apache.kafka.clients.consumer.RoundRobinAssignor"//1.创建一个消费者val kafkaConsumer = KafkaConsumer<String, String>(properties)//2.定义主题 xiao1val topics = mutableListOf<String>()topics.add("xiao1")kafkaConsumer.subscribe(topics)//3.消费数据while (true) {val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))for (msg in consumerRecord) {println("consumer,msg:::$msg")}}}

注意:06为一组给到一个消费者,3为一组给到另外一个消费者。45秒后重新发送数据,consumer2:0246,consumer3:135

Sticky (黏性)

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5、3 号分区数据。

2 号消费者:消费到 4、6 号分区数据。

0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别 由 1 号消费者或者 2 号消费者消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需 要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 2、3、5 号分区数据。

2 号消费者:消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

随机+均匀

宕机后分配的消费者和45秒后分配消费者一样

宕机(3消费者变2消费者):1403,235

45秒后2消费者:1403,235

本文转自 https://blog.csdn.net/weixin_52925162/article/details/126280062?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170100111416800225544545%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=170100111416800225544545&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-8-126280062-null-null.142v96pc_search_result_base9&utm_term=keycloak%20docker-compose&spm=1018.2226.3001.4187,如有侵权,请联系删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579227.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

63.乐理基础-打拍子-四十六

历史知识&#xff1a; 当前写的节奏型是四十六节奏型&#xff0c;同二八这个词的意思类似&#xff0c;四十六就是四个十六分音符组成的节奏型&#xff0c;所以简称四十六&#xff0c;以四分音符为一拍的时候&#xff0c;四个十六分音符加起来的总拍数就是一拍&#xff0c;这…

算法训练第四十八天|198. 打家劫舍、213. 打家劫舍 II、337. 打家劫舍 III

198. 打家劫舍&#xff1a; 题目链接 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都藏有一定的现金&#xff0c;影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统&#xff0c;如果两间相邻的房屋在同一晚上被小偷闯入&#xff0c;系统会自动报…

SpringBoot整合JWT+Spring Security+Redis实现登录拦截(二)权限认证

上篇博文中我们已经实现了登录拦截&#xff0c;接下来我们继续补充代码&#xff0c;实现权限的认证 一、RBAC权限模型 什么事RBAC权限模型&#xff1f; RBAC权限模型&#xff08;Role-Based Access Control&#xff09;即&#xff1a;基于角色的权限访问控制。在RBAC中&#x…

15.权限控制 + 置顶、加精、删除

目录 1.权限控制 1.1 登录检查 1.2 授权配置 1.3 认证方案 1.4 CSRF 配置 2.置顶、加精、删除 2.1 开发数据访问层 2.2 业务层 2.3 表现层 Spring Security 是一个专注于为 Java 应用程序提供身份认证和授权的框架&#xff0c;它的强大之处在于它可以轻松扩展以满足自…

蓝桥杯的学习规划

c语言基础&#xff1a; Python语言基础 学习路径&#xff1a;画框的要着重学习

一文读懂SoBit 跨链桥教程

从BTC网络到Solana网络桥接BRC20 1.打开SoBit平台&#xff1a;在您的网络浏览器中启动SoBit Bridge应用程序。 2.连接您的钱包&#xff1a; 选择SoBit界面右上角的比特币网络来连接您的数字钱包。 3.选择源链、目标链和您想桥接的代币&#xff1a; 从下拉菜单中选择’BTC’作为…

翻硬币C语言

分析&#xff1a;首先&#xff0c;我们如果想要使得两次的硬币可以转化&#xff0c;那么两组字符对应不同的的个数就只能是偶数&#xff0c;比如&#xff1a; * * * * * o o * * * * * 我们要对上面的例子翻动5次&#xff0c;我们可以看出两个不同的位置相差五个单位&#x…

用C/C++实现MSML协议栈的详细介绍

一、MSML协议简介 MSML&#xff08;Media Server Markup Language&#xff09;是一种基于XML的标记语言&#xff0c;用于控制媒体服务器。它是媒体服务器控制协议的一种&#xff0c;允许第三方应用与媒体服务器进行交互&#xff0c;实现对媒体流的创建、修改和释放等操作。MSM…

Netty—Reactor线程模型详解

文章目录 前言线程模型基本介绍线程模型分类Reactor线程模型介绍Netty线程模型&#xff1a; 传统阻塞IO的缺点Reactor线程模型单Reactor单线程模式单Reactor多线程模式主从Reactor多线程Reactor 模式小结 Netty 线程模型案例说明&#xff1a;Netty核心组件简介ChannelPipeline与…

Linux中Mysql数据库备份操作

逻辑备份 备份的是建表、建库、插入等操作所执行SQL语句&#xff0c;适用于中小型数据库&#xff0c;效率相对较低。 本质&#xff1a;导出的是SQL语句文件 优点&#xff1a;不论是什么存储引擎&#xff0c;都可以用mysqldump备成SQL语句 缺点&#xff1a;速度较慢&#xff0c;…

Centos7安装Docker和Docker-Compose

环境 操作系统&#xff1a;Centos 7.9 root环境 Docker安装 卸载原先的Docker环境 如果你先前的操作系统安装了Docker环境&#xff0c;请卸载 Docker 相关的软件包&#xff0c;没有则忽略这一步。 yum remove docker \docker-client \docker-client-latest \docker-common \doc…

(2021|CVPR,XMC-GAN,对比学习,注意力自调制)用于文本到图像生成的跨模态对比学习

Cross-Modal Contrastive Learning for Text-to-Image Generation 公众&#xff1a;EDPJ&#xff08;添加 VX&#xff1a;CV_EDPJ 或直接进 Q 交流群&#xff1a;922230617 获取资料&#xff09; 目录 0. 摘要 1. 简介 2. 相关工作 3. 基础 4. 方法 4.1 用于文本到图像…

【软件工程】可执行文件和数据分离

一、概述 可执行文件和数据分离是一种软件设计策略&#xff0c;旨在将程序代码和程序使用的数据分离存储。这种方法通常用于提高软件的模块化程度和灵活性&#xff0c;以及方便软件的管理和维护。 在可执行文件和数据分离中&#xff0c;程序代码通常以可执行文件的形式存储&a…

Java小案例-Sentinel的实现原理

前言 Sentinel是阿里开源的一款面向分布式、多语言异构化服务架构的流量治理组件。 主要以流量为切入点&#xff0c;从流量路由、流量控制、流量整形、熔断降级、系统自适应过载保护、热点流量防护等多个维度来帮助开发者保障微服务的稳定性。 核心概念 要想理解一个新的技…

unityc用vs2017介绍

21版unity能用17vs&#xff0c;只要在unity的Edit/Preferences/ExternalTools里面改既可。

音频修复增强软件iZotope RX 10 mac特点介绍

iZotope RX 10 mac是一款音频修复和增强软件。 iZotope RX 10 mac软件特点 声音修复&#xff1a;iZotope RX 10可以去除不良噪音、杂音、吱吱声等&#xff0c;使音频变得更加清晰干净。 音频增强&#xff1a;iZotope RX 10支持对音频进行音量调节、均衡器、压缩器、限制器等处…

SpringSecurity6 | 登录失败后的JSON处理

✅作者简介:大家好,我是Leo,热爱Java后端开发者,一个想要与大家共同进步的男人😉😉 🍎个人主页:Leo的博客 💞当前专栏: 循序渐进学SpringSecurity6 ✨特色专栏: MySQL学习 🥭本文内容: SpringSecurity6 | 登录失败后的JSON处理 📚个人知识库: Leo知识库,…

Java架构师系统架构设计实践

目录 1 导语2 架构设计实践本章概述3 架构设计要素概述和规划4 架构设计模式5 架构设计输入6 架构设计输出7 架构设计要素总结 想学习架构师构建流程请跳转&#xff1a;Java架构师系统架构设计 1 导语 Java架构师在进行系统架构设计时&#xff0c;需要综合考虑多个方面&#…

SAP PP 配置学习(二)

MRP 参数文件设定 扩允物料视图 删除物料 物料批量维护

【C Primer Plus第六版 学习笔记】第十四章 结构和其他数据形式

有基础&#xff0c;进阶用&#xff0c;个人查漏补缺 建立结构声明&#xff1a;描述该对象由什么组成&#xff0c;即结构布局 格式&#xff1a; 关键字 标记&#xff08;可选&#xff09;{结构 }&#xff1b; 举例&#xff1a; struct book{char title[2];char author[4];float …