Kafka-Java一:Spring实现kafka消息的简单发送

目录

写在前面

一、创建maven项目

二、引入依赖

2.1、maven项目创建完成后,需要引入以下依赖

2.2、创建工程目录

三、创建生产者

3.1、创建生产者,同步发送消息

3.2、创建生产者,异步发送消息

四、同步发送消息和异步发送消息的区别

五、报错处理思路


写在前面

        该文章通过spring只实现消息的简单发送,不实现消息的监听。

一、创建maven项目

        创建maven过程不再赘述。

二、引入依赖

2.1、maven项目创建完成后,需要引入以下依赖

    // kafka 依赖<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>//  json依赖,demo中可能会用到该依赖,与kafka依赖无关<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.10</version></dependency>

2.2、创建工程目录

三、创建生产者

3.1、创建生产者,同步发送消息

        3.1.1、在MyProducer中实现如下代码

package com.demo.lxb.kafka;import com.alibaba.fastjson.JSON;
import com.demo.lxb.entiry.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;/*** @Description:* @Author: lvxiaobu* @Date: 2023-10-23 17:06**/
public class MyProducer {private final  static String TOPIC_NAME = "topic0921";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();// 一、设置参数// 配置kafka地址//props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.151.28:9092"); // 单机配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.151.28:9092,192.168.151.28:9092,192.168.151.28:9092"); // 集群配置// 配置消息 键值的序列化规则props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 二、声明消息对象// 未指定发送分区,具体撒送分区的计算公式: hash(key)%PartitionNum// 创建发送的消息: producerRecord// 参数1: 要发送的主题// 参数2: 消息的key值,可有可无,如果存在的话,该字段用来带入分区的计算公式// 参数3: value,具体的消息的内容,json格式的字符串ProducerRecord<String,String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME,"mykey","hello-kafka");// 三、声明消息发送者Producer<String,String> producer = new KafkaProducer<String,String>(props);// 开发发送,并返回结果和元数据RecordMetadata recordMetadata = producer.send(producerRecord).get();System.out.println("发送消息结果: " + "topic-" + recordMetadata.topic() + " | partition-"+ recordMetadata.partition() + " | offset-" + recordMetadata.offset());}
}

        执行main方法,结果如下:

        如果多次执行main方法,会发现offset偏移量的数字会发生变化。 

3.2、创建生产者,异步发送消息

        3.2.1、在MyProducer2中实现如下代码

package com.demo.lxb.kafka;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @Description: kafka 异步发送消息* @Author: lvxiaobu* @Date: 2023-10-23 17:06**/
public class MyProducer2 {private final  static String TOPIC_NAME = "topic0921";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 二、声明消息对象// 未指定发送分区,具体撒送分区的计算公式: hash(key)%PartitionNum// 创建发送的消息: producerRecord// 参数1: 要发送的主题// 参数2: 消息的key值,可有可无,如果存在的话,该字段用来带入分区的计算公式// 参数3: value,具体的消息的内容,json格式的字符串ProducerRecord<String,String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME,"mykey","hello-kafka2");// 三、声明消息发送者Producer<String,String> producer = new KafkaProducer<String,String>(props);// 异步发送消息,通过callback回调函数获取发送结果producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e != null){System.out.println("消息发送失败:" + e);}if(recordMetadata != null){System.out.println("发送消息结果: " + "topic-" + recordMetadata.topic() + " | partition-"+ recordMetadata.partition() + " | offset-" + recordMetadata.offset());}}});Thread.sleep(50000L);}
}

执行 Main方法,会产生和同步发送消息一样的结果。

说明:Thread.sleep(50000L)是让主线程休眠50s,否则主线程在异步发送了消息以后就直接结束了,不会再输出callback中的输出语句

四、同步发送消息和异步发送消息的区别

消息的同步发送
  如果生产者发送的消息没有收到kafka的ack通知,生产者会产生阻塞,如果阻塞了3s仍然没有收到消息反馈,会进行消息发送的重试操作,重试的次数是3次。如果三次以后还不行,代码将抛出异常
消息的异步发送
  生产者发送消息后,会提供一个callback的回调方法,callback会获取消息是否发送成功的结果。但是需要注意,异步发送消息会出现消息的丢失。

五、报错处理思路

        3.2.1、检查Props配置Kafka地址是否正确

        3.2.2、检查Linux是否关闭防火墙

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/118516.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FLStudio21汉化破解激活版下载,Fl Studio 2024中文破解版激活补丁

最新版本FL Studio 21官方中文汉化激破解版是比利时Image-Line公司开发的DAW。在去年DTM站的DAW调查中&#xff0c;在世界上很受欢迎&#xff0c;特别是作为EDM制作工具被广泛使用。从1997年以FruityLoops的名字发行的时候开始&#xff0c;FL Studio 21就一直作为Windows专用的…

Whisper 整体架构图

Attention 注意力机制模块&#xff0c;兼容自注意力和交叉注意力。 AttentionBlock Transformer 模块&#xff0c;包含一个自注意力&#xff0c;一个交叉注意力&#xff08;可选&#xff09;和一个 MLP 模块。 AudioEncoderTextDecoder 音频编码器和文本解码器。编码器的 Tr…

python 桌面软件开发-matplotlib画图鼠标缩放拖动

继上一篇在 Java 中缩放拖动图片后&#xff0c;在python matplotlib中也来实现一个自由缩放拖动的例子&#xff1a; python matplotlib 中缩放&#xff0c;较为简单&#xff0c;只需要通过设置要显示的 x y坐标的显示范围即可。基于此&#xff0c;实现一个鼠标监听回调&#xf…

Docker镜像仓库

Docker镜像仓库 一、Docker镜像的创建1.1、基于已有镜像创建1.2、基于本地模板创建1.3、基于Dockerfile创建&#xff08;使用最广泛&#xff09;1.3.1、联合文件系统&#xff08;UnionFS&#xff09;1.3.2、镜像加载原理1.3.3、Dockerfile1.3.4、Docker 镜像结构的分层 二、如何…

【带头学C++】----- 1.基础知识 ---- 1.21.23.9 位运算符的综合应用

最近做任务&#xff0c;公司项目比较重&#xff0c;赶上1024的活动流量券任务&#xff0c;内容治疗略微有一些杂乱&#xff0c;后期会把专栏目录重新搞一下&#xff0c;内容我是融合了很多课程和书籍包含ai的一些理解&#xff0c;我整理和增加了自己的见解和代码贴图&#xff0…

newstar week3 pwn

newstar week3 pwn 巩固知识&#xff0c;如有错误记得纠正&#xff0c;感谢师傅们的评阅 puts or system? Arch: amd64-64-little RELRO: Partial RELRO Stack: Canary found NX: NX enabled PIE: No PIE (0x400000)int __cdecl main(int argc, const…

uni-app:解决异步请求返回值问题

可以使用 Promise 或者回调函数来处理异步请求的返回值。 方法一&#xff1a; Promise处理异步请求的返回值 使用 Promise 可以将异步请求的结果通过 resolve 和 reject 返回&#xff0c;然后通过 .then() 方法获取成功的结果&#xff0c;通过 .catch() 方法获取错误信息。 …

鸿鹄工程项目管理系统 Spring Cloud+Spring Boot+Mybatis+Vue+ElementUI+前后端分离构建工程项目管理系统项目背景

鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部工程管…

统计文本词频的几种方法(Python)

目录 1. 单句的词频统计 2. 文章的词频统计 方法一&#xff1a;运用集合去重方法 方法二&#xff1a;运用字典统计 方法三&#xff1a;使用计数器 词频统计是自然语言处理的基本任务&#xff0c;针对一段句子、一篇文章或一组文章&#xff0c;统计文章中每个单词出现的次数…

40.查找练习题(王道2023数据结构第7章)

试题1&#xff08;王道7.2.4节综合练习5&#xff09;&#xff1a; 写出折半查找的递归算法。 #include<stdio.h> #include<stdlib.h> #include<string.h>#define MAXSIZE 10 #define ElemType int #define Status inttypedef struct{int data[MAXSIZE]; /…

GD32_定时器输入捕获波形频率

GD32_定时器输入捕获波形频率&#xff08;多通道轮询&#xff09; 之前项目上用到一个使用定时器捕获输入采集风扇波形频率得到风扇转速的模块&#xff0c;作为笔记简单记录以下当时的逻辑结构和遇到的问题&#xff0c;有需要参考源码、有疑问或需要提供帮助的可以留言告知 。…

Spring Event

前言 ApplicationEvent 与 ApplicationListener 应用 实现 基于注解 事件过滤 异步事件监听 好处及应用场景 源码阅读 总结 1前言 ApplicationContext 中的事件处理是通过 ApplicationEvent 类和 ApplicationListener 接口提供的。如果将实现了 ApplicationListener …

【AI视野·今日Robot 机器人论文速览 第五十八期】Thu, 19 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Thu, 19 Oct 2023 Totally 25 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers InViG: Benchmarking Interactive Visual Grounding with 500K Human-Robot Interactions Authors Hanbo Zhang, Jie Xu, Yuch…

Qt生成PDF报告

文章目录 一、示意图二、实现部分代码总结 一、示意图 二、实现部分代码 //! 生成测试报告 void MainWindow::createPdf(QString filename, _pdf_msg_& msg, const QMap<QString, int>& ok, const QMap<QString, int>& err) {//QDir dir;if(!dir.exis…

Vue快速入门

一、概述 1.是一套前端框架&#xff0c;可免除原生JavaScript中的DOM操作&#xff0c;基于MVVM思想&#xff0c;实现数据双向绑定。 实现由MVC——>MVVM的转换 二、入门 1.新建HTML页面&#xff0c;引入Vue.js文件 2.在JS代码区&#xff0c;创建Vue核心对象&#xff0c;进行…

嵌入式软件工程师面试题——2025校招专题(三)

说明&#xff1a; 面试题来源于网络书籍&#xff0c;公司题目以及博主原创或修改&#xff08;题目大部分来源于各种公司&#xff09;&#xff1b;文中很多题目&#xff0c;或许大家直接编译器写完&#xff0c;1分钟就出结果了。但在这里博主希望每一个题目&#xff0c;大家都要…

生产环境元空间内存溢出(OOM)的问题排查

一、现象 2023.10.17下午收到业务反馈&#xff0c;说是接口调用超时&#xff0c;进件系统和核心系统调用外数系统接口时等待过久&#xff0c;引起系统异常。然后我们看了下接口调用的日志&#xff0c;确实接口的响应时间在五十秒左右。我们自己测试了下&#xff0c;发现也是这…

leetcode 503. 下一个更大元素 II、42. 接雨水

下一个更大元素 II 给定一个循环数组 nums &#xff08; nums[nums.length - 1] 的下一个元素是 nums[0] &#xff09;&#xff0c;返回 nums 中每个元素的 下一个更大元素 。 数字 x 的 下一个更大的元素 是按数组遍历顺序&#xff0c;这个数字之后的第一个比它更大的数&…

【c#】2022创建WEB API接口教程demo

c#创建WEB API接口 创建WEB API接口结果图涉及到的技术设计流程创建WEB API接口 结果图 涉及到的技术 设计流程 1、创建WEB api项目,使用控制器和penapi勾选上,第一次创建项目时没有勾选,因为感觉没啥用。后面跑项目的时候,要把接口用swagger去直接生成的时候,还是需要…