SpringBoot集成系列--Kakfa

文章目录

  • 一、代码
    • 1、添加依赖
    • 2、配置kafka
    • 3、创建生产者
    • 4、创建消费者
    • 5、测试
  • 二、遇到问题
    • 1、could not be established. Broker may not be available
    • 2、Error while fetching metadata with correlation id xxx

一、代码

1、添加依赖

在pom.xml文件中添加Kafka的依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka

生产者项目的application.properties文件中配置kafka

spring.kafka.bootstrap-servers=192.168.56.100:9092
spring.kafka.producer.client-id=forlan-client-id
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

参数说明:

  • spring.kafka.bootstrap-servers:指定Kafka集群的地址。这是一个以逗号分隔的服务器列表,每个服务器都以IP地址和端口号的形式表示。
  • spring.kafka.producer.client-id:指定Kafka生产者的客户端ID。这是一个用于标识生产者的唯一字符串。在诊断和监控时,这个ID可以帮助识别发送给Kafka的哪些消息是由哪个生产者发送的。
  • spring.kafka.producer.key-serializer:定义Kafka的消息的键的序列化器。
  • spring.kafka.producer.value-serializer:定义Kafka的消息的值的序列化器。

消费者项目的application.properties文件中配置kafka消费者

spring.kafka.bootstrap-servers=192.168.56.100:9092
spring.kafka.consumer.client-id=forlan-client-id
spring.kafka.consumer.group-id=forlan-group-id
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

参数说明:

  • spring.kafka.bootstrap-servers:指定Kafka集群的地址。这是一个以逗号分隔的服务器列表,每个服务器都以IP地址和端口号的形式表示。
  • spring.kafka.consumer.client-id:指定Kafka消费者的客户端ID。这是一个用于标识消费者的唯一字符串。在诊断和监控时,这个ID可以帮助识别从Kafka接收到的哪些消息是由哪个消费者接收的。
  • spring.kafka.consumer.group-id:指定Kafka消费者的组ID。在消费者组中,成员之间可以共享消息消费进度,以便在需要时进行重新消费或进行故障转移。
  • spring.kafka.consumer.key-deserializer:指定用于反序列化从Kafka接收的消息的键的类。
  • spring.kafka.consumer.value-deserializer:指定用于反序列化从Kafka接收的消息的值的类。

3、创建生产者

设置发送消息服务类

@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void testSend(String topic, String message) throws ExecutionException, InterruptedException {SendResult<String, String> stringStringSendResult = kafkaTemplate.send(topic, message).get();System.out.println(stringStringSendResult);}
}

4、创建消费者

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumer {@KafkaListener(topics = "forlan_topic")public void listen(String message) {System.out.println("Kafka收到消息:" + message);}
}

5、测试

编写测试类,发送消息

@Autowired
private KafkaProducer kafkaProducer;@Test
public void testKafka() throws ExecutionException, InterruptedException {kafkaProducer.testSend("forlan_topic", "Forlan测试发送Kafka消息"+ LocalDateTime.now());
}

执行测试类,效果如下:
在这里插入图片描述
消费者后,会收到消息,如下:
在这里插入图片描述

二、遇到问题

1、could not be established. Broker may not be available

[Consumer clientId=consumer-forlan-group-id-1, groupId=forlan-group-id] Connection to node 1001 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

由于我们的kafka是装在容器内,使用下面这种方式,localhost指的是容器的ip,会有问题

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

正确的做法,应该是改为我们宿主机的ip,如下:

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.56.100:9092

其实就是把localhost设置为具体的ip,宿主机的ip。

2、Error while fetching metadata with correlation id xxx

 [Producer clientId=forlan-client-id-1] Error while fetching metadata with correlation id 3 : {forlan-topic=LEADER_NOT_AVAILABLE}

重启kafka即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/216643.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

docker---资源控制

docker的资源控制 对容器使用宿主机的资源进行限制。 三种控制方向&#xff1a;CPU 内存 磁盘I/O docker使用linux自带的功能cgroup&#xff1b;control groups是linux内核系统提供的一种可以限制记录&#xff0c;隔离进程所使用的物理资源机制。 docker借助此…

excel数据重复率怎么计算【保姆教程】

大家好&#xff0c;今天来聊聊excel数据重复率怎么计算&#xff0c;希望能给大家提供一点参考。 以下是针对论文重复率高的情况&#xff0c;提供一些修改建议和技巧&#xff1a; excel数据重复率怎么计算 在Excel中计算数据重复率可以通过以下步骤实现&#xff1a; 1. 确定重复…

redis的深度理解

上篇博客我们说到了redis的基本概念和基本操作&#xff0c;本篇我们就更深入去了解一些redis的操作和概念&#xff0c;我们就从red的主从同步、redis哨兵模式和redis集群三个方面来了解redis数据库 一、主从同步 像MySQL一样&#xff0c;redis是支持主从同步的&#xff0c;而…

排序的简单理解(下)

4.交换排序 基本思想&#xff1a;所谓交换&#xff0c;就是根据序列中两个记录键值的比较结果来对换这两个记录在序列中的位置 交换排序的特点是&#xff1a;将键值较大的记录向序列的尾部移动&#xff0c;键值较小的记录向序列的前部移动。 4.1 冒泡排序 冒泡排序&#xff08…

vue3若依框架,在页面中点击新增按钮跳转到新的页面,不是弹框,如何实现

在router文件中的动态路由数组中新增一个路由配置&#xff0c;这个配置的就是新的页面。 注意path不要和菜单配置中的路径一样&#xff0c;会不显示内容。 在菜单配置中要写权限标识就是permissions:[]里的内容 在children里的path要写占位符info/:data 点击新增按钮&#x…

HTML+CSS高频面试题

面试题目录 前言1.讲一下盒模型&#xff0c;普通盒模型和怪异盒模型有什么区别2.CSS如何实现居中3.讲一下flex弹性盒布局4.CSS常见的选择器有哪些&#xff1f;优先级5.长度单位px 、em、rem的区别6.position属性的值有哪些7.display属性的值有哪些&#xff0c;分别有什么作用8.…

std::map

一 emplace() emplace_hint() try_emplace()区别 1. emplace template< class... Args >std::pair<iterator, bool> emplace( Args&&... args ); 若容器中没有拥有该键的元素&#xff0c;则向容器插入以给定的 args 原位构造的新元素。 细心地使用 em…

20231211-DISM++安装win10-22h2-oct

20231211-DISM安装win10-22h2-oct 一、软件环境 zh-cn_windows_10_consumer_editions_version_22h2_updated_oct_2023_x64_dvd_eb811ccc.isowepe x64 v2.3标签&#xff1a;win10 22h2 wepe dism分栏&#xff1a;WINDOWS 二、硬件环境 8G或以上的有PE功能的启动U盘一个台式机…

Python常用文件操作库详解与示例

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 文件操作是编程中常见的任务之一&#xff0c;而Python提供了丰富的文件操作库&#xff0c;使得文件的读取、写入、复制、移动等操作变得非常便捷。本文将深入介绍一些Python中常用的文件操作库&#xff0c;以及它…

原型图都可以用什么软件制作?推荐这9款

对于设计师来说&#xff0c;一个有用的原型设计工具可以大大提高他们的工作效率&#xff0c;节省很多时间。当然&#xff0c;不同的原型设计工具有一定的差异&#xff01;那么哪个原型设计工具更好呢&#xff1f;以下是一些有用的原型设计软件&#xff0c;有需要的朋友可以根据…

红队攻防实战之DEATHNOTE

难道向上攀爬的那条路&#xff0c;不是比站在顶峰更让人热血澎湃吗 渗透过程 获取ip 使用Kali中的arp-scan工具扫描探测 端口扫描 可以看到开放了22和80端口。 访问80端口&#xff0c;重定向到 修改hosts文件&#xff0c;将该域名解析到ip 如图 修改完再次访问&#xff0…

如何在pytest接口自动化框架中扩展JSON数据解析功能?

开篇 上期内容简单说到了。params类类型参数的解析方法。相较于简单。本期内容就json格式的数据解析&#xff0c;来进行阐述。 在MeterSphere中&#xff0c;有两种方式可以进行json格式的数据维护。一种是使用他们自带的JsonSchema来填写key-value表单。另一种就是手写json。…

总线一:I2C简介(介绍看这一篇就够啦)

本节主要介绍以下内容&#xff1a; I2C协议简介 STM32的I2C特性及架构 I2C初始化结构体详解 一、I2C协议简介 I2C 通讯协议(Inter&#xff0d;Integrated Circuit)是由Phiilps公司开发的&#xff0c;由于它引脚少&#xff0c;硬件实现简单&#xff0c;可扩展性强&#xff…

Java判断字符串是不是数字

描述&#xff1a;通过Java判断一个字符串&#xff0c;是不是数字。这里包括正数、负数、浮点数、科学计数法 代码&#xff1a; import java.util.regex.Pattern;public class Test {public static void main(String[] args) {System.out.println(isNumeric("12.23")…

数据结构二维数组计算题,以行为主?以列为主?

1.假设以行序为主序存储二维数组Aarray[1..100,1..100]&#xff0c;设每个数据元素占2个存储单元&#xff0c;基地址为10&#xff0c;则LOC[5,5]&#xff08; &#xff09;。 A&#xff0e;808 B&#xff0e;818 C&#xff0e;1010 D&…

【LeetCode-树】-- 109.有序链表转换二叉搜索树

109.有序链表转换二叉搜索树 方法&#xff1a;找到链表的中点&#xff0c;将其作为根节点 /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* ListNo…

python中import mysql.connector出错无模块,且是已经pip install mysql-connector情况下

已经安装了mysql-connector和mysql-connector-python&#xff0c;使用python连接数据库&#xff0c;导入import mysql.connector仍报错&#xff1a; import mysql.connector# Connect to server cnx mysql.connector.connect(host"127.0.0.1",port3306,user"a…

视频剪辑进阶指南:批量置入视频封面,增加视频吸引力

在视频剪辑的进阶阶段&#xff0c;除了掌握基本的剪辑技巧和特效处理&#xff0c;还要尝试一些创新的方法来增加视频的吸引力。批量置入视频封面就是一种有效的方式。通过置入吸引的封面&#xff0c;能吸引观众点击视频并提高观看量。下面详细介绍云炫AI智剪如何批量置入视频封…

pandas按行值筛选

之前都没有意识到这个问题&#xff0c;就是pandas取某一行的值的问题 测试代码如下 import pandas as pd import numpy as np df pd.DataFrame({A: foo bar foo bar foo bar foo foo.split(),B: one one two three two two one three.split(),C: np.arange(8), D: np.arange…

GO闭包实现原理(汇编级讲解)

go语言闭包实现原理(汇编层解析) 1.起因 今天开始学习go语言,在学到go闭包时候,原本以为go闭包的实现方式就是类似于如下cpp lambda value通过值传递,mutable修饰可以让value可以修改,但是地址不可能一样value通过引用传递,但是在其他地方调用时,这个value局部变量早就释放,…