kafka(三)springboot集成kafka(1)介绍

一、相关组件介绍

1、pom:
<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>
2、kafkaProducer

produce的发送主要流程概述如下:

  1. 拦截器对发送的消息拦截处理;

  2. 获取元数据信息;

  3. 序列化处理;

  4. 分区处理;

  5. 批次添加处理;

  6. 发送消息。

 

3、 KafkaConsumer

二、生产者发送消息类型

1、同步发送消息

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {// 默认为异步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));// 末尾加get为同步发送kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();}// 5. 关闭资源kafkaProducer.close();}
}
2、异步发送消息
2.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducer {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));}// 5. 关闭资源kafkaProducer.close();}
}
2.2、带回调函数的异步发送

 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class CustomProducerCallBack {public static void main(String[] args) {// 1. 创建kafka生产者的配置对象Properties properties = new Properties();// 2. 给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// key,value序列化(必须):// 序列化器的serialization是一个接口,找到他的实现类// 我们一般都是使用Stringproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 3. 创建kafka生产者对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);// 4. 调用send方法,发送消息for (int i = 0; i < 10; i++) {kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {//(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法//(2)消息发送失败  exception != null  也会调用该方法if (exception == null) {System.out.println(metadata);//使用打印演示}else{exception.printStackTrace();//打印异常信息}}});}// 5. 关闭资源kafkaProducer.close();}
}
3、发送顺序消息 

三、消费者接收消息

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {// 1. 创建消费者配置对象Properties properties = new Properties();// 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)//  broker的ip地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");// 配置  反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//配置消费者组(组名必须)properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");// 3. 创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);// 注册消费主题ArrayList<String> topics = new ArrayList<>();topics.add("first");consumer.subscribe(topics);// 4.调用方法消费数据// 如果kafka集群没有新数据会造成空转// 填写参数为时间,如果没有拉取数据,线程睡眠一会while (true) {// 设置1s中消费的一批数据// Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1sConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 打印消费数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());}}//5.关闭资源
//        consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/721403.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vulhub中Wordpress 4.6 任意命令执行漏洞复现

由于Mysql初始化需要一段时间&#xff0c;所以请等待。成功运行后&#xff0c;访问http://your-ip:8080/打开站点&#xff0c;初始化管理员用户名和密码后即可使用&#xff08;数据库等已经配置好&#xff0c;且不会自动更新&#xff09;。 发送如下数据包&#xff0c;可见/tmp…

计网面试题整理上

1. 计算机网络的各层协议及作用&#xff1f; 计算机网络体系可以大致分为一下三种&#xff0c;OSI七层模型、TCP/IP四层模型和五层模型。 OSI七层模型&#xff1a;大而全&#xff0c;但是比较复杂、而且是先有了理论模型&#xff0c;没有实际应用。TCP/IP四层模型&#xff1a…

【三维重建】相移法+格雷码

本篇文章介绍一种稠密点云的获取方式——条纹结构光三维重建算法。 在学习此算法前&#xff0c;我们需要对基于视觉的三维重建算法有一定了解。 需要了解什么是相机模型、相机标定以及三角化的相关知识。 【三维重建】摄像机几何-CSDN博客 【三维重建】摄像机标定&#xff…

微信小程序-2

数据绑定 index.js Page({data: {info: hello world,randomNumber: Math.random() * 10,imgSrc:http://www.itheima.com/images/logo.png} })index.wxml <view>{{ info }}</view><view>{{ randomNumber > 5 ? 随机数大于等于5 : 随机数小于5 }}</v…

低密度奇偶校验码LDPC(七)——SPA和积译码算法的简化

往期博文 低密度奇偶校验码LDPC&#xff08;一&#xff09;——概述_什么是gallager构造-CSDN博客 低密度奇偶校验码LDPC&#xff08;二&#xff09;——LDPC编码方法-CSDN博客 低密度奇偶校验码LDPC&#xff08;三&#xff09;——QC-LDPC码概述-CSDN博客 低密度奇偶校验码…

神经网络推理优化方法总结

&#x1f380;个人主页&#xff1a; https://zhangxiaoshu.blog.csdn.net &#x1f4e2;欢迎大家&#xff1a;关注&#x1f50d;点赞&#x1f44d;评论&#x1f4dd;收藏⭐️&#xff0c;如有错误敬请指正! &#x1f495;未来很长&#xff0c;值得我们全力奔赴更美好的生活&…

Jenkins 的安装(详细教程)

文章目录 一、简介二、安装前准备三、windows 安装与启动1. 方式一2. 方式二3. 方式三 四、创建管理员用户五、常用设置1. 配置镜像地址2. 更改工作目录3. 开启可注册用户4. 全局变量配置 一、简介 官网&#xff1a;https://www.jenkins.io 中文文档&#xff1a;https://www.j…

【MGR】MySQL Group Replication快速开始

目录 17.2 Getting Started 17.2.1 Deploying Group Replication in Single-Primary Mode 17.2.1.1 Deploying Instances for Group Replication 17.2.1.2 Configuring an Instance for Group Replication Storage Engines Replication Framework Group Replication Sett…

Java基础概念 1-6注释关键字字面量变量-基本用法变量-使用方式和注意事项变量练习-计算公交车的人数

Java基础概念 1-注释 单行注释 // 多行注释 /* */ 文档注释 /** */ --暂时不用 例: public class HelloWorld{ //main方法,表示程序的主入口.public static void main (String[] args){/*输出语句(打印语句)会把小括号内的内容进行输出打印.*/System.out.…

Ethersacn的交易数据是什么样的(2)

分析 Raw Transanction RLP&#xff08;Recursive Length Prefix&#xff09;是一种以太坊中用于序列化数据的编码方式。它被用于将各种数据结构转换为二进制格式&#xff0c;以便在以太坊中传输和存储。RLP 是一种递归的编码方式&#xff0c;允许对复杂的数据结构进行编码。所…

鸿蒙实战应用开发:【拨打电话】功能

概述 本示例通过输入电话&#xff0c;进行电话拨打&#xff0c;及电话相关信息的显示。 样例展示 涉及OpenHarmony技术特性 网络通信 基础信息 拨打电话 介绍 本示例使用call相关接口实现了拨打电话并显示电话相关信息的功能 效果预览 使用说明 1.输入电话号码后&#…

EIP-1559

EIP EIP是以太坊改进提案&#xff08;Ethereum Improvement Proposal&#xff09;的缩写。它是一种标准化的提案制度&#xff0c;用于描述和讨论对以太坊区块链网络的改进和升级。EIP的目的是提供一个开放的、透明的过程&#xff0c;让社区成员、开发者和其他利益相关者能够共同…

paypal绑卡教程

绑定信用卡到PayPal账户的流程可能会有轻微变化&#xff0c;具体步骤可能根据您所在的地区和PayPal的最新政策而有所不同。以下是一般的流程&#xff1a; 登录PayPal账户&#xff1a; 打开PayPal的官方网站或应用程序&#xff0c;使用您的账户登录凭据登录。 导航至钱包&#…

Kafka面经

1.Kafka如何保证消息不丢失 生产者&#xff1a; 1.Producer 默认是异步发送消息&#xff0c;这种情况下要确保消息发送成功&#xff0c;有两个方法 a. 把异步发送改成同步发送&#xff0c;这样 producer 就能实时知道消息发送的结果。 b. 添加异步回调函数来监听消息发送的结…

redis02 安装

官网下载 传送门https://redis.io/download/#redis-downloads 安装Redis mac m1安装 下载你需要版本的软件包放到指定的目录下进行解压 cd 到解压好的redis目录 运行下面的命令进行编译测试 sudo make test 中途可能会提示你安装make工具&#xff0c;按提示安装即可&…

JWT身份验证

在实际项目中一般会使用jwt鉴权方式。 JWT知识点 jwt&#xff0c;全称json web token &#xff0c;JSON Web令牌是一种开放的行业标准RFC 7519方法&#xff0c;用于在两方安全地表示声明。具体网上有许多文章介绍&#xff0c;这里做简单的使用。 1.数据结构 JSON Web Token…

Unity 动态加载音频和音效

想要加载音效和音频需要两个组件&#xff1a; 听&#xff1a; 播&#xff1a; 一收一发 在层级中&#xff0c;右键创建 音频源 &#xff0c;放入物体的子物体中。 播放 方式一 拖动需要播放的音频文件到&#xff0c;音频源组件中。 using System.Collections; using Syst…

Guitar Pro 8.1中文版永久许可证激活2024最新24位注册激活码生成器

Guitar Pro是一款非常受欢迎的音乐制作软件&#xff0c;它可以帮助用户创建和编辑各种音乐曲谱。从其诞生以来就送专门为了编写吉他谱而研发迭代的。 尽管这款产品可能已经成为全球最受欢迎的吉他打谱软件&#xff0c;在编写吉他六线谱和乐队总谱中始终处于行业领先地位&#…

Java求职技能清单(2024版)

一、Java基础扎实&#xff08;反射、集合、IO、NIO、多线程、设计模式、通信协议等基础技术&#xff09; &#xff08;一&#xff09;Java &#xff08;二&#xff09;网络IO &#xff08;三&#xff09;NIO模型 &#xff08;…

释放数据湖潜力:小红书如何实现数仓效率与成本的双重优化

在当今以数据为核心的商业环境中&#xff0c;企业正面临着海量数据的处理和分析挑战。为克服传统数据仓库在处理速度、灵活性和成本效率方面的局限&#xff0c;小红书数据仓库团队引入如 Apache Iceberg 等数据湖技术&#xff0c;将其与数仓架构相结合&#xff0c;以释放数据湖…