【基于注解方式】Spring整合Kafka

文章目录

    • 1. 添加Maven依赖
    • 2. 配置与参数分离
    • 3. 工具类度内容
    • 4. Producer 消息生产者配置
    • 5. Consumer 消息消费者配置
    • 6. 使用注解监听消息
    • 7. 请求测试
    • 8. 测试结果

1. 添加Maven依赖

<!-- 添加spring-kafka支持 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.2.RELEASE</version>
</dependency>

2. 配置与参数分离

使用kafka.properties文件形式,将配置与参数分离,方便管理。
kafka.properties文件如下:

################## kafkaListener Producer 发送端 ##################
# brokers 集群
kafka.producer.bootstrap.servers = 192.168.43.242:9092,192.168.43.134:9092,192.168.43.228:9092#发送端 id
kafka.producer.client.id = producerDemo#发送端确认模式
kafka.producer.acks = -1#发送失败重试次数
kafka.producer.retries = 3#批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送
kafka.producer.batch.size = 4096#与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至broker
kafka.producer.linger.ms = 10# 3355443232MB的批处理缓冲区
#kafka.producer.buffer.memory = 40960#默认 topic
kafka.producer.defaultTopic = testTopic#key 序列化
kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer#value 序列化
kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer################## kafkaListener Consumer 消费端 ###################消费端 brokers 集群
kafka.consumer.bootstrap.servers = 192.168.43.242:9092,192.168.43.134:9092,192.168.43.228:9092#消费者 group.id 组ID
kafka.consumer.group.id = test-group#消费者消费消息后,进行自动提交
kafka.consumer.enable.auto.commit = true#自动提交的频率(与 enable.auto.commit = true 属性配合使用)
kafka.consumer.auto.commit.interval.ms = 1000#新的groupid,是否从头开始消费
kafka.consumer.auto.offset.reset = earliest#在使用kafka组管理时,发送心跳机制,用于检测消费者故障的超时
#kafka.consumer.session.timeout.ms = 1000#key 反序列化
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer#value 反序列化
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer#消费端消费的topic
kafka.consumer.topic = testTopic

3. 工具类度内容

添加 PropertiesUtils 工具类,来读取 properties文件内容

package com.demo.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;public class PropertiesUtils {private static Logger log = LoggerFactory.getLogger(PropertiesUtils.class);/*** 根据文件名获取Properties对象* @param fileName* @return*/public static Properties read(String fileName){InputStream in = null;try{Properties prop = new Properties();//InputStream in = Object.class.getResourceAsStream("/"+fileName);in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){return null;}prop.load(in);return prop;}catch(Exception e){e.printStackTrace();}finally{try {if(in != null){in.close();}} catch (IOException e) {e.printStackTrace();}}return null;}/*** 根据文件名和键名获取值* @param fileName* @param key* @return*/public static String readKeyValue(String fileName, String key){Properties prop = read(fileName);if(prop != null){return prop.getProperty(key);}return null;}/*** 根据键名获取值* @param prop* @param key* @return*/public static String readKeyValue(Properties prop, String key){if(prop != null){return prop.getProperty(key);}return null;}/*** 写入* @param fileName* @param key* @param value*/public static void writeValueByKey(String fileName, String key, String value){Map<String, String> properties = new HashMap<String, String>();properties.put(key, value);writeValues(fileName, properties);}/*** 写入* @param fileName* @param properties*/public static void writeValues(String fileName, Map<String, String> properties){InputStream in = null;OutputStream out = null;try {in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){throw new RuntimeException("读取的文件("+fileName+")不存在,请确认!");               }Properties prop = new Properties();prop.load(in);String path = PropertiesUtils.class.getResource("/"+fileName).getPath();out = new FileOutputStream(path);if(properties != null){Set<String> set = properties.keySet();for (String string : set) {prop.setProperty(string, properties.get(string));log.info("更新"+fileName+"的键("+string+")值为:"+properties.get(string));}}prop.store(out, "update properties");} catch (Exception e) {e.printStackTrace();} finally{try {if(in != null){in.close();}if(out != null){out.flush();out.close();}} catch (Exception e2) {e2.printStackTrace();}}}public static void main(String[] args) throws Exception {//System.out.println("read="+read("config.properties"));//System.out.println("readKeyValue="+readKeyValue("config.properties","superAdmin"));//writeValueByKey(CC.WEIXI_PROPERTIES, "access_token", "ddd");Map<String, String> properties = new HashMap<String, String>();properties.put("access_token", "ddd2");properties.put("access_token1", "ee2");properties.put("bbbb", "bbbb");}
}

4. Producer 消息生产者配置

package com.demo.kafka;import com.demo.utils.PropertiesUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;/*** Kafka 消息生产者配置*/
@Configurable
@Component
@EnableKafka
public class KafkaProducerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaProducerConfig() {System.out.println("kafka 生产者配置加载...");}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory(producerProperties());}public Map<String, Object> producerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服务地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.producer.bootstrap.servers"));//设置当前客户端idprops.put(ProducerConfig.CLIENT_ID_CONFIG, properties.getProperty("kafka.producer.client.id"));//设置消费端确认机制props.put(ProducerConfig.ACKS_CONFIG, properties.getProperty("kafka.producer.acks"));//发送失败重试次数props.put(ProducerConfig.RETRIES_CONFIG, properties.getProperty("kafka.producer.retries"));//批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送props.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getProperty("kafka.producer.batch.size"));//与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至brokerprops.put(ProducerConfig.LINGER_MS_CONFIG,properties.getProperty("kafka.producer.linger.ms"));//Key序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.key.serializer"));//Value序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.value.serializer"));return props;}@Beanpublic KafkaTemplate<String,String> kafkaTemplate(){KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory(),true);//设置默认的topic(此处可做一些具体设置)kafkaTemplate.setDefaultTopic(properties.getProperty("kafka.producer.defaultTopic"));return kafkaTemplate;}
}

5. Consumer 消息消费者配置

package com.demo.kafka;import com.demo.utils.PropertiesUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configurable
@Component
@EnableKafka
public class KafkaConsumerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaConsumerConfig() {System.out.println("kafka消费者配置加载...");}public Map<String, Object> consumerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服务地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.consumer.bootstrap.servers"));//消费组props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getProperty("kafka.consumer.group.id"));//设置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getProperty("kafka.consumer.enable.auto.commit"));//设置间隔时间props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getProperty("kafka.consumer.auto.commit.interval.ms"));//Key反序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.key.deserializer"));//Value反序列化props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.value.deserializer"));//从头开始消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getProperty("kafka.consumer.auto.offset.reset"));return props;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic KafkaConsumerListener kafkaConsumerListener() {return new KafkaConsumerListener();}
}

6. 使用注解监听消息

通过 @KafkaListener 注解的形式,消费topic中的消息

public class KafkaConsumerListener {@KafkaListener(topics = "testTopic01")public void listen01(ConsumerRecord<String,String> consumerRecord){System.out.println("开始消费testTopic01的消息");System.out.println("消费者线程:"+Thread.currentThread().getName()+"[ 消息 来自kafkatopic:"+consumerRecord.topic()+",分区:"+consumerRecord.partition() +" ,委托时间:"+consumerRecord.timestamp()+"]消息内容如下:");System.out.println(consumerRecord.value());}@KafkaListener(topics = "testTopic02")public void listen02(ConsumerRecord<String,String> consumerRecord){System.out.println("开始消费testTopic02的消息");System.out.println(consumerRecord.value());}/*** 消费 某个topic 下指定分区的消息*/@KafkaListener(topicPartitions = {@TopicPartition(topic = "liuzebiao",partitions = {"1"})})public void topicMessage(ConsumerRecord<?, ?> record,String content){System.out.println("消息:"+ content);System.out.println("消息被消费------>Topic:"+ record.topic() + ",------>Value:" + record.value() +",------>Partition:" + record.partition());}
}

7. 请求测试

通过Spring Controller的形式,开始测试

@Controller
@RequestMapping("kafka")
public class KafkaController {@AutowiredKafkaTemplate kafkaTemplate;/*** 消息发送*/@RequestMapping("producer")@ResponseBodypublic void producer(){kafkaTemplate.send("testTopic01","producer发送消息01");kafkaTemplate.send("testTopic02","producer发送消息02");}}

8. 测试结果

通过 localhost:8080/kafka/producer 调用接口,使用 kafkaTemplate 发送消息。

消息发送成功后,在控制台会收到如下信息:

开始消费testTopic01的消息
消费者线程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1[ 消息 来自kafkatopic:testTopic01,分区:0 ,委托时间:1568107936693]消息内容如下:
producer发送消息01开始消费testTopic02的消息
producer发送消息02

项目链接:
https://github.com/gb-heima/spring-annotataion-kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/521125.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云PyODPS 0.7.18发布,针对聚合函数进行优化同时新增对Python 3.7支持

近日&#xff0c;阿里云发布PyODPS 0.7.18&#xff0c;主要是针对聚合函数进行优化同时新增对Python 3.7支持。 PyODPS是MaxCompute的Python版本的SDK&#xff0c;SDK的意思非常广泛&#xff0c;辅助开发某一类软件的相关文档、范例和工具的集合都可以叫做“SDK”。 PyODPS在这…

centos安装rabbitmq_【SpringBoot MQ系列教程】RabbitMq 初体验

SpringBoot 系列教程之 RabbitMq 初体验​mp.weixin.qq.commq 在异步解耦削峰的优势非常突出&#xff0c;现在很多的项目都会用到&#xff0c;掌握 mq 的知识点&#xff0c;了解如何顺畅的使用 mq&#xff0c;可以说是一个必备的职业技能点了接下来我们进入 rabbitmq 的学习过程…

java rpc与webservice_RPC体系,RPC和WebService的区别详解

RPC和WebService的关系RPC(Remote Procedure Call)— 远程过程调用&#xff0c;是一个很大的概念, 它是一种通过网络从远程计算机程序上跨语言跨平台的请求服务&#xff0c;rpc能省略部分接口代码的开发&#xff0c;可以跨机器之间访问对象(java rmi)&#xff0c;可以有更方便的…

免费公测中-GPU数据库SQream DB正式上线云市场

业内领先的GPU 数据库服务SQream DB在阿里云云市场正式开启免费公测&#xff01;SQream DB是一款由阿里战略投资的以色列SQream公司提供&#xff0c;能够支撑海量数据高速分析的业内领先的GPU数据库。通过将计算密集型操作卸载到GPU上&#xff0c;与业界的解决方案相比&#xf…

唏嘘!2019榜单出炉:铁打的Python连续3年第一,它居然跌出前十?

IEEE Spectrum2019年度编程语言排行榜最近刚刚出炉&#xff0c;Python不出意外的又拿了个第一&#xff0c;但是意料之外的是&#xff0c;曾经大火的PHP&#xff0c;居然跌出了前十&#xff01;PHP曾被大家称为“世界上最好的编程语言”&#xff0c;去年排名第六&#xff0c;前年…

centos7无法使用epel的解决方法

使用如下代码在centos7安装epel源&#xff0c;却无法使用。 yum -y install epel-release网络没问题&#xff0c;可以ping通epel源的地址&#xff0c;但是就是连不上repo&#xff0c;报错。 解决方法&#xff1a;编辑/etc/yum.repos.d/epel.repo&#xff0c;将epel配置信息中…

阿里云ECS家族再添新成员,推出密集计算型实例规格族ic5

去年&#xff0c;阿里云正式发布云服务器ECS企业级产品家族&#xff0c;推出面向173种企业应用场景的19款实例。适合在复杂的企业计算环境下&#xff0c;满足对于高性能、高可靠的计算需求。 时隔近一年&#xff0c;回看ECS企业级产品家族已经发展到30款实例&#xff0c;近日再…

findbugs插件_Intellij静态代码扫描插件SpotBugs

最近要做Java静态扫描的部分工作&#xff0c;之前是在Jenkins上使用findbugs插件完成的&#xff0c;但是由于现在Jenkins权限收回和Java代码权限的放开(我也搞不懂这两者的关联性)&#xff0c;目前打算在本地完成静态代码扫描工作。选来选取还是选择在Intellij中用插件来完成&a…

一张图看懂智联车管理云平台

智联车管理云平台&#xff08;IoV Command Center&#xff0c;简称IoV CC&#xff09;是阿里云面向智联车领域&#xff0c;专门推出的车辆全生命周期云端管理平台&#xff0c;旨在赋能车厂转型出行服务商&#xff0c;提高运营效率、降低自建成本。 传统模式下&#xff0c;车辆…

刨根问底 | 红遍全网的SD-WAN,到底是个啥?

戳蓝字“CSDN云计算”关注我们哦&#xff01;作者 | 小枣君责编 | 阿秃作为一个热门概念&#xff0c;SD-WAN近年以来频繁地出现在我们的视野当中。很多人说&#xff0c;它是未来最具发展潜力的通信技术之一&#xff0c;极具商业价值。行业里的老牌通信设备商和运营商对它一致看…

centos7安装rabbitmq简单方式

安装rabbitmq前要准备的基础环境 yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers需下载的安装文件如下 ## erlang wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.…

java socket数据传输_Java Socket编程(一) Socket传输模式

Java Socket编程(一) Socket传输模式文章来源&#xff1a;ASPCN 作者&#xff1a;孙雯Socket传输模式Sockets有两种主要的操作方式:面向连接的和无连接的.面向连接的sockets操作就像一部电话,他们必须建立一个连接和一人呼叫.所有的事情在到达时的顺序与它们出发时的顺序时一样…

车联网上云最佳实践(一)

一、车联网行业特性讲解 最近两年车联网发展受到政府部门、科研院以及各大互联网巨头的广泛关注和积极推动。从应用来看&#xff0c;主要包括两种模式&#xff1a;一是前装模式&#xff08;即车辆出厂前安装&#xff09;&#xff0c;是乘用车厂主导或者与有相关能力的公司合作&…

python3 网站状态监控_基于python3监控服务器状态进行邮件报警

在正式的生产环境中&#xff0c;我们常常会需要监控服务器的状态&#xff0c;以保证公司整个业务的正常运转&#xff0c;常常我们会用到像nagios、zabbix这类工具进行实时监控&#xff0c;那么用python我们怎么进行监控呢&#xff1f;这里我们利用了python3调用psutil和yagmail…

Centos7 安装Rabbitmq-server和Erlang 仓库汇总

新版本链接rabbitmq-serverhttps://github.com/rabbitmq/rabbitmq-server/releaseserlanghttps://github.com/rabbitmq/erlang-rpm/releasesrabbitmq-server和erlang版本对照https://www.rabbitmq.com/which-erlang.html 旧版本链接erlanghttps://www.rabbitmq.com/releases/e…

车联网上云最佳实践(二)

云上对标架构及技术详解 我们对传统IDC应用架构进行分析之后&#xff0c;我们发现之前的系统架构存在一些不合理的地方导致了很多的痛点&#xff0c;为了解决这些痛点我们最终考虑上云。开始思考怎样利用云上产品来解决目前遇到的痛点。例如 为了解决我们自建IDC底层基础设施可…

java list 字段去重_java list 根据对象一个字段去重

1.主要思路就是根据从写equals 以及 hashCode 方法。代码如下&#xff1a;package com.bfd.unibase.modules.dataview.entity;import org.hibernate.validator.constraints.Length;import java.util.ArrayList;import java.util.Date;import java.util.HashSet;import java.uti…

Dubbo Mesh | 阿里巴巴中间件团队在 Service Mesh 的实践和探索(附PPT)

精彩观点导读&#xff1a; 我们去探索一项技术&#xff0c;并不会仅仅因为其先进性&#xff0c;而是因为我们目前遇到了一些无法解决的问题&#xff0c;而这项技术正好能解决这个问题。 所有软件最重要的使命不是满足功能要求&#xff0c;而是演进&#xff0c;从而持续成长。…

flutter 自定义键盘_掘金 AMA:听闲鱼客户端架构师邬吉风聊 Flutter 和移动端开发那些事...

第二十一期 AMA 掘金团队请来了闲鱼客户端架构师&#xff0c;《Fish-Redux》作者-- 邬吉风做了为期三天的 Ask Me Anything (AMA) 活动(活动已结束)。我们在此精选了一些来自用户的提问及邬吉风的回答。关于 邬吉风阿里花名吉丰&#xff0c; 《Fish-Redux》作者。现任闲鱼客户端…

linux CentOS7 erlang安装

linux CentOS7 erlang安装 RabbitMQ官网方法&#xff08;安装比较快&#xff0c;几分钟搞定&#xff09; 如果只是使用RabbitMQ&#xff0c;个人推荐使用RabbitMQ公司维护的erlang版本&#xff0c;该版本只保留了与RabbltMQ相关的功能&#xff0c; 地址是https://dl.bintray.co…