springboot 整合 kafka demo 顺便看一下源码

大家好,我是烤鸭:

    今天分享下 springboot 整合 kafka。

1.  环境参数:

      windows + kafka_2.11-2.3.0 + zookeeper-3.5.6 + springboot 2.3.0 

2.  下载安装zookeeper + kafka
 

zookeeper:

https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz

复制 zoo_sample.cfg ,改名为 zoo.cfg,增加日志路径:

dataDir=D:\xxx\env\apache-zookeeper-3.5.6-bin\data
dataLogDir=D:\xxx\env\apache-zookeeper-3.5.6-bin\log

启动zk,zkServer.cmd

kafka:

https://kafka.apache.org/downloads

 Binary downloads 下载
https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz

修改 config/server.properties,由于zk用的默认端口 2181,所以不需要改

log.dirs=D:\\xxx\\env\\kafka\\logs

启动kafka

 D:\xxx\env\kafka\bin\windows\kafka-server-start.bat D:\xxx\env\kafka\config\server.properties

3.  springboot 接入

pom.xml

 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.0.RELEASE</version><scope>compile</scope></dependency></dependencies>

application.yml

spring:kafka:# 指定kafka server的地址,集群配多个,中间,逗号隔开bootstrap-servers: 127.0.0.1:9092# 生产者producer:# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。retries: 0# 每次批量发送消息的数量,produce积累到一定数据,一次发送batch-size: 16384# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据buffer-memory: 33554432# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:linger.ms: 1# 消费者consumer:enable-auto-commit: falseauto-commit-interval: 100mskey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000group-id: group
server:port: 8081

 KafkaDemoController.java

package com.mys.mys.demo.kafka.web;import com.mys.mys.demo.kafka.service.KafkaSendService;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;@RestController
public class KafkaDemoController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@AutowiredKafkaSendService kafkaSendService;@GetMapping("/message/send")public boolean send(@RequestParam String message) {//默认自动创建,消费者端 allow.auto.create.topics = true//createTopic();kafkaTemplate.send("testTopic-xxx15", message);return true;}//同步@GetMapping("/message/sendSync")public boolean sendSync(@RequestParam String message){kafkaSendService.sendSync("synctopic",message);return  true;}//异步示例@GetMapping("/message/sendAnsyc")public boolean sendAnsys(@RequestParam String message){kafkaSendService.sendAnsyc("ansyctopic",message);return  true;}/*** @Author* @Description 创建主题* @Date 2020/5/23 19:03* @Param []* @return void**/private void createTopic() {Map<String, Object> configs = new HashMap<>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");KafkaAdmin admin = new KafkaAdmin(configs);NewTopic newTopic = new NewTopic("testTopic-xxx15",1,(short)1);AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties());adminClient.createTopics(Arrays.asList(newTopic));}
}

KafkaSendService.java

package com.mys.mys.demo.kafka.service;import com.mys.mys.demo.kafka.handler.KafkaSendResultHandler;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;@Service
public class KafkaSendService {@Autowiredprivate KafkaTemplate<String,Object> kafkaTemplate;@Autowiredprivate KafkaSendResultHandler producerListener;/*** 异步示例* */public void sendAnsyc(final String topic,final String message){//统一监听处理kafkaTemplate.setProducerListener(producerListener);ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic,message);//具体业务的写自己的监听逻辑future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onSuccess(SendResult<String, Object> result) {System.out.println("发送消息成功:" + result);}@Overridepublic void onFailure(Throwable ex) {System.out.println("发送消息失败:"+ ex.getMessage());}});}/*** 同步示例* */public void sendSync(final String topic,final String message){ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, message);try {kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);System.out.println("发送成功");}catch (ExecutionException e) {System.out.println("发送消息失败:"+ e.getMessage());}catch (TimeoutException | InterruptedException e) {System.out.println("发送消息失败:"+ e.getMessage());}}
}

CustomerListener.java

package com.mys.mys.demo.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class CustomerListener {@KafkaListener(topics="testTopic")public void onMessage(String message){System.out.println("消费="+message);}@KafkaListener(topics="testTopic-xxx14")public void onMessage1(String message){System.out.println("消费="+message);}@KafkaListener(topics="testTopic-xxx15")public void onMessage15(String message){System.out.println("消费="+message);}
}

KafkaSendResultHandler.java(用于接收异步的返回值)

package com.mys.mys.demo.kafka.handler;import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;@Component
public class KafkaSendResultHandler implements ProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {log.info("Message send success : " + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, Exception exception) {log.info("Message send error : " + producerRecord.toString());}
}

4.  效果和部分源码分析

 看一下项目启动的日志,消费者监听到的分区和队列名称。另外如果kafka没有这个队列,在调用send方法时自动创建,看以下这个配置。
auto.create.topics.enable ,默认为 true。

访问路径:http://localhost:8081/message/send?message=1234
输出结果。

可以看下 ProducerRecord 这个类,方法先不贴了,看这几个属性。

public class ProducerRecord<K, V> {//队列名称private final String topic;//分区名称,如果没有指定,会按照key的hash值分配。如果key也没有,按照循环的方式分配。private final Integer partition;//请求头,用来存放k、v以外的信息,默认是只读的private final Headers headers;//key-valueprivate final K key;private final V value;//时间戳,如果不传,默认按服务器时间来private final Long timestamp;
}

再看下 Producer,重点看下 send方法,kafka支持同步或异步接收消息发送的结果,实现都是靠Future,只是异步的时候future执行了回调方法,支持拦截器方式。

/*** The interface for the {@link KafkaProducer}* @see KafkaProducer* @see MockProducer*/
public interface Producer<K, V> extends Closeable {/*** See {@link KafkaProducer#send(ProducerRecord)}*/Future<RecordMetadata> send(ProducerRecord<K, V> record);/*** See {@link KafkaProducer#send(ProducerRecord, Callback)}*/Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
}

更详细的看这篇文章说的很好:

https://www.cnblogs.com/dingwpmz/p/12153036.html

简单总结一下:
Producer的send方法并不会直接像broker发送数据,而是计算消息长度是否超限,是否开启事务,如果当前缓存区已写满或创建了一个新的缓存区,则唤醒 Sender(消息发送线程),将缓存区中的消息发送到 broker 服务器,以队列的形式(每个topic+每个partition维护一个双端队列),即 ArrayDeque,内部存放的元素为 ProducerBatch,即代表一个批次,即 Kafka 消息发送是按批发送的。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/412660.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2018ACM/ICPC亚洲区域赛(焦作)F. Honeycomb

目录 F. Honeycomb (2018-ACM/ICPC焦作)F. Honeycomb (2018-ACM/ICPC焦作) Problem F. Honeycomb Input file: standard input Output file: standard output A honeycomb is a mass wax cells built by honey bees, which can be described as a regular tiling of the Euclid…

[css]怎么改变选中文本的文字颜色和背景色?

[css]怎么改变选中文本的文字颜色和背景色&#xff1f; ::selection { background-color: #222; color: white; }个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

goal org.mybatis.generator:mybatis-generator-maven-plugin:1.3.6:generate failed: Index: 0, Size: 0

大家好&#xff0c;我是烤鸭&#xff1a; 报错信息如下&#xff1a; Failed to execute goal org.mybatis.generator:mybatis-generator-maven-plugin:1.3.6:generate (default-cli) on project etc-bosc-repository: Execution default-cli of goal org.mybatis.generator:m…

.NETCore_生成实体

先安装以下三个包&#xff0c;或者使用Nuget引用 不要问我为什么&#xff0c;按哥说的做吧&#xff1a; Install-Package Microsoft.EntityFrameworkCore.SqlServer Install-Package Microsoft.EntityFrameworkCore.Tools Install-Package Microsoft.VisualStudio.Web.CodeGene…

[css] 你对响应式设计的理解是什么?知道它基本的原理是吗?要想兼容低版本的IE怎么做呢?

[css] 你对响应式设计的理解是什么&#xff1f;知道它基本的原理是吗&#xff1f;要想兼容低版本的IE怎么做呢&#xff1f; 理解&#xff1a;在不同系统&#xff0c;不同设备&#xff0c;不同尺寸的界面&#xff0c;有良好的用户体验&#xff0c;舒适的阅读体验&#xff0c;交…

php rabbitmq demo

composer安装php rabbitmq包 新建composer.json文件&#xff0c;composer install 安装 {"require": {"php-amqplib/php-amqplib": ">2.6.1"} } 创建config.php文件 <?php return [vendor > [path > ./vendor],rabbitmq > [host…

[css] 你有使用过哪些栅格系统?都有什么区别呢?

[css] 你有使用过哪些栅格系统&#xff1f;都有什么区别呢&#xff1f; bootstrap3 float完成的栅格 bootstrap4 flex完成的栅格个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通…

dubbo源码解析(二)

大家好&#xff0c;我是烤鸭&#xff1a; dubbo 源码解析&#xff1a; 1.服务导出 介绍: Dubbo 服务导出过程始于 Spring 容器发布刷新事件&#xff0c;Dubbo 在接收到事件后&#xff0c;会立即执行服务导出逻辑。整个逻辑大致可分为三个部分&#xff0c;第一部分是前置工作&am…

[css] 请说说*{box-sizing: border-box;}的作用及好处有哪些?

[css] 请说说*{box-sizing: border-box;}的作用及好处有哪些&#xff1f; 还是喜欢用默认的content-box 不考虑老版ie 比较通配符的性能较差 第三方的UI库的盒模型也都是标准盒模型个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持…

执行mongod其他实例出现的问题

windows环境下&#xff0c;配置其他mongo实例&#xff0c;会出现一些问题 1、配置路径不对&#xff0c;执行bat文件时出现闪屏 根据提示创建C:\data\db\ 目录&#xff08;因为mongodb默认在/data/db下创建数据库&#xff09;&#xff0c;重新执行mongod实例&#xff0c;就OK&am…

从 class 文件 看 synchronize 锁膨胀过程(偏向锁 轻量级锁 自旋锁 重量级锁)

大家好&#xff0c;我是烤鸭: 前几天看马士兵老师的并发的课&#xff0c;里边讲到了 synchronize 锁的膨胀过程&#xff0c;今天想用代码演示一下。 1. 简单介绍 关于synchronize jdk 1.5 以后的优化&#xff0c;由重量级锁调整为膨胀过程。分别是 偏向锁 轻量级锁&#xff0…

[css] 说说你对jpg、png、gif的理解,分别在什么场景下使用?有使用过webp吗?

[css] 说说你对jpg、png、gif的理解&#xff0c;分别在什么场景下使用&#xff1f;有使用过webp吗&#xff1f; jpg, 色彩复杂图片 png, 色彩简单图片 gif, 动图, 或者色彩极简的icon等 webp, 判断能使用webp的浏览器就是用webp个人简介 我是歌谣&#xff0c;欢迎和大家一起交…

GC算法与收集器

一.判断对象是否存活 1.引用计数算法 2.可达性分析算法 二.垃圾收集算法 1.标记-清除算法&#xff1a;效率低&#xff0c;内存碎片 2.复制算法&#xff1a;适用于对象存活率低 3.标记-整理算法&#xff1a;没有内存碎片 4.分代收集算法&#xff1a;新生代用复制算法 老年代用标…

[css] 如何消除transition闪屏?

[css] 如何消除transition闪屏&#xff1f; 题目越简单越有含量。 看题意不知道在问什么&#xff0c;说明这个问题自己没注意或不熟悉&#xff0c;而不是去怀疑题目出的有问题。这个问题自己没有遇到过&#xff0c;或者说没有注意过这个问题&#xff0c;网上搜索了下答案&…

php opcache 详解

PHP性能提升之OPcache相关参数详解 工具 memory 发布于December 15, 2016 标签: PHPOPcache 通过将 PHP 脚本预编译的字节码存储到共享内存中来提升 PHP 的性能&#xff0c; 存储预编译字节码的好处就是 省去了每次加载和解析 PHP 脚本的开销。 PHP 5.5.0 及后续版本中已经绑定…

es elasticsearch 几种常见查询场景 二次分组 java读取es的查询json文件

大家好&#xff0c;我是烤鸭&#xff1a; es中几种常见的查询场景,使用java读取es的json文件进行查询。 es 中文使用手册。https://www.elastic.co/guide/cn/elasticsearch/guide/current/foreword_id.html 1. 从最简单的查询开始 GET /_search {"hits" : {&qu…

[css] 元素竖向的百分比设置是相对容器的高度吗?

[css] 元素竖向的百分比设置是相对容器的高度吗&#xff1f; 父级非 auto 的 height 时&#xff0c;子级百分比的 height 才有效。 即使父级有 min-height 或其他子级撑起的高度&#xff0c;子级百分比 height 依旧无效。个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后…

阿里云服务器邮件发送

一个邮件发送的功能&#xff0c;本机调试无问题&#xff0c;但发布到阿里云服务器后邮件发送功能失败。 网上查了下大概是说阿里云把发送邮件的25端口禁用掉了 那么解决方式一就是向阿里云申请开放25端口&#xff0c;但具体如何申请&#xff0c;并未深入操作。 解决方式二&…

全链路追踪竟然如此简单? bytebuddy搭建全链路追踪的demo 附代码

大家好&#xff0c;我是烤鸭&#xff1a; 最近一直在研究全链路追踪&#xff0c;比如cat、skywalking、zipkin等。 发现 skywalking 是基于bytebuddy 实现的&#xff0c;想自己试着写一下demo。 demo的git地址,感兴趣的可以自己试下。代码在idea中可以跑,至于其他场…

[css] 用CSS绘制一个红色的爱心

[css] 用CSS绘制一个红色的爱心 // 用过 就给贴过来了.heart {position: relative;width: 100px;height: 90px;}.heart:before,.heart:after {position: absolute;content: "";left: 50px;top: 0;width: 50px;height: 80px;background: red;border-radius: 50px 50p…