springboot集成kafka详解

文章目录

        • 1、kafka部署:
          • (1)先创建一个网络:
          • (2)安装zookeeper,kafka依赖zookeeper所以需要先安装zookeeper:
          • (3)安装Kafka:
            • 参数解释:
          • (4)部署kafka图形化管理工具,选择kafka-map或kafka-manager:
            • kafka-map(推荐)
            • kafka-manager(不好用,不推荐)
        • 2、springboot集成kafka:
          • (1)pom文件里引入kafka依赖
          • (2)application.yml配置文件:
          • (3)在你的项目里新建文件结构如下:
            • 生产者,KafkaProducer.java文件内容如下:
            • 消费者,KafkaConsumer.java文件内容如下:
            • controller文件内容如下:
          • (4)调用 /kafka/test 接口,打印结果:

1、kafka部署:
(1)先创建一个网络:
docker network create app-tier --driver bridge
(2)安装zookeeper,kafka依赖zookeeper所以需要先安装zookeeper:
docker run -d --name zookeeper-server --network app-tier -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
(3)安装Kafka:
docker run -d --name kafka-server --network app-tier -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.202.211:9092 bitnami/kafka:latest
参数解释:
ALLOW_PLAINTEXT_LISTENER		# 任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECT		# 链接的zookeeper
KAFKA_CFG_ADVERTISED_LISTENERS		# 当前主机IP或地址(我的主机IP为:192.168.202.211)
(4)部署kafka图形化管理工具,选择kafka-map或kafka-manager:
kafka-map(推荐)
docker run -d --name kafka-map --network app-tier -p 9090:8080 -v /root/kafka-map/data:/usr/local/kafka-map/data -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --restart always dushixiang/kafka-map:latest访问地址:192.168.202.211:9090
账号密码:admin/admin
kafka-manager(不好用,不推荐)
docker run --name kafka-manager -d --network app-tier -p 9091:9000 -e ZK_HOSTS="zookeeper-server:2181" sheepkiller/kafka-manager访问地址:192.168.202.211:9091
2、springboot集成kafka:
(1)pom文件里引入kafka依赖
<!-- kafka -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.1</version>
</dependency>
(2)application.yml配置文件:
spring:kafka:bootstrap-servers: 192.168.202.211:9092          # Kafka服务器地址和端口consumer:group-id: my-group-id                          # 消费者组ID,与部署的Kafka服务中的组ID保持一致auto-offset-reset: earliest                    # 从最早的消息开始消费,可选值:earliest, latest, none
(3)在你的项目里新建文件结构如下:
生产者,KafkaProducer.java文件内容如下:
package com.heurd.intellidigital.service.modular.data.kafka.producer;import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** @Author: goggle* @Date: 2023-11-10 14:19* @Desperation: kafka生产者*/
@Component
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topicName, String message) {System.out.printf("topicName: %s, message: %s%n", topicName, message);kafkaTemplate.send(topicName, message);}}
消费者,KafkaConsumer.java文件内容如下:
package com.heurd.intellidigital.service.modular.data.kafka.consumer;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author: goggle* @Date: 2023-11-10 14:19* @Desperation: kafka消费者*/
@Component
public class KafkaConsumer {@KafkaListener(topics = "my-topic", groupId = "my-group-id") // 指定监听的主题和组IDpublic void listen(String message) {System.out.println("Received message: " + message);// 处理接收到的消息}}
controller文件内容如下:
package com.heurd.intellidigital.service.modular.data.controller;import com.heurd.intellidigital.service.modular.data.kafka.consumer.KafkaConsumer;
import com.heurd.intellidigital.service.modular.data.kafka.producer.KafkaProducer;
import com.heurd.intellimes.core.annotion.BusinessLog;
import com.heurd.intellimes.core.enums.LogAnnotionOpTypeEnum;
import com.heurd.intellimes.core.pojo.response.ResponseData;
import com.heurd.intellimes.core.pojo.response.SuccessResponseData;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** @Author: goggle* @Date: 2023-11-10 14:19* @Desperation: kafka控制器*/
@RestController
@RequestMapping("/kafka")
public class KafkaController {@Resourceprivate KafkaProducer kafkaProducer;@Resourceprivate KafkaConsumer kafkaConsumer;/*** kafka测试** @author heurd* @date 2023-11-10 15:29:19*/@ApiOperation("kafka测试")@GetMapping(value = "/test")public ResponseData<Object> kafka() {String message = "Hello, Kafka!"; 			// 要发送的消息内容kafkaProducer.sendMessage("my-topic", message); 	// 发送消息到指定主题(这里为"my-topic")// 此时,Kafka消费者会自动监听到该消息并调用listen()方法进行处理。注意:消费者的监听是异步的,因此不会立即收到响应。如果您需要等待响应,请根据实际情况进行修改。return new SuccessResponseData<>(message);}}
(4)调用 /kafka/test 接口,打印结果:
生产者:topicName: my-topic, message: Hello, Kafka!消费者:Received message: Hello, Kafka!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/141690.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++ jthread 使用详解

c jthread 使用详解 std::jthread c20 头文件 #include <thread>。对 std::thread 的改进&#xff0c;旨在提供更好的线程管理和异常处理&#xff0c;除了拥有和 std::thread 完全相同的功能以及 API 之外&#xff0c;还增加了以下功能&#xff1a; 可停止&#xff1a;…

无人地磅称重系统|自助过磅 料仓联动 自助卸料

上海思伟无人地磅系统 自助过磅、 自助卸料 、料仓联动 智能、省人、安全 无人监管过磅 对地磅及其相关的所有硬件进行配置和管理&#xff1b; 支持红外、道闸、车牌识别、AI分析、拍照存档、LED语音播报一体机等设备&#xff1b; 实现稳定可靠的无人监管称重功能&#xf…

云服务器哪家强?阿里云双十一2核2G配置3M带宽仅99元/年!

阿里云作为国内知名的云计算服务提供商&#xff0c;每年的双11都会推出各种优惠活动和促销策略。在今年的双11期间&#xff0c;阿里云推出了多种选择的云服务器&#xff0c;其中两款备受用户关注&#xff1a;轻量服务器2核2G3M带宽优惠价87元一年和经济型e实例2核2G配置3M带宽9…

Vue3 ref函数和reactive函数

一、ref函数 我们在setup函数中导出的属性和方法虽然能够在模板上展示出来&#xff0c;但是并没有给属性添加响应式&#xff0c;因此&#xff0c;我们需要使用ref函数来为我们的数据提供响应式。 &#xff08;一&#xff09;引入ref函数 import { ref } from "vue"…

数据结构:红黑树的原理和实现

文章目录 红黑树的概念红黑树的性质红黑树的模拟实现红黑树的平衡问题 整体实现和测试 本篇用于进行红黑树的拆解和模拟实现&#xff0c;为之后的map和set的封装奠定基础 红黑树的概念 红黑树也是一种二叉搜索树&#xff0c;但是在每一个节点的内部新增了一个用以表示该节点颜…

IDEA的优化配置教程

前言 IDEA 全称 IntelliJ IDEA&#xff0c;是java编程语言开发的集成环境。IntelliJ在业界被公认为最好的java开发工具&#xff0c;尤其在智能代码助手、代码自动提示、重构、JavaEE支持、各类版本工具(git、svn等)、JUnit、CVS整合、代码分析、 创新的GUI设计等方面的功能可以…

Day 项目介绍与 SSM 环境搭建

介绍一个使用Java语言开发的项目&#xff0c;演示并如何搭建一个基于SSM&#xff08;SpringSpring MVCMyBatis&#xff09;框架的开发环境。SSM是一种常用的Java Web开发框架&#xff0c;它集成了Spring、Spring MVC和MyBatis&#xff0c;提供了一套完整的开发架构&#xff0c;…

OOM汇总

1. 堆内存溢出 堆内存溢出通常是由于创建了过多的对象&#xff0c;而导致堆内存耗尽而发生的。以下是导致堆内存溢出的一些常见情况&#xff1a; 内存泄漏&#xff1a; 如果程序中存在内存泄漏&#xff0c;即一些对象不再被引用&#xff0c;但仍然存活于堆内存中&#xff0c;…

thinkphp5 连接多个服务器数据库

如果你的database.php 是这样&#xff0c; 这是默认的db连接配置 如果还想连接其他服务器&#xff0c;或数据库 在config.php中追加数据库配置&#xff0c; 在使用的地方调用&#xff1a; use think\Db;public function test(){$db3Db::connect(config(db3));$result $db3…

使用gitflow时如何合并hotfix

前言 在使用 git flow 流程时, 对于项目型的部署项目经常会遇到一个问题, 就是现场项目在使用历史版本时发现的一些问题需要修复, 但升级可能会有很大的风险或客户不愿意升级, 这时就要求基于历史版本进行 hotfix 修复. 基于历史发布版本的缺陷修复方式不同于最新发布版本的补…

CSS 实现文字两端对齐效果

直接上代码&#xff1a; .text{text-align: justify;text-align-last: justify;// 兼容ie (未测试)text-justify: distribute-all-lines; }注意&#xff1a; text-align-last: justify; 只对中文文字起效果。数字和英文字母则需要使用空格间隔开&#xff0c;然后使用上述方法即…

自然语言处理实战项目21-两段文本的查重功能,返回最相似的文本字符串,可应用于文本查重与论文查重

大家好,我是微学AI,今天给大家介绍一下自然语言处理实战项目21-两段文本的查重功能,返回最相似的文本字符串,可应用于论文查重。本文想实现一种文本查重功能,通过输入两段文本,从中找出这两段文本中最相似的句子。这项技术有助于检测抄袭、抄袭的论文和文章,提高知识创新…

Ubuntu22.04 FTP 搭建以及挂载

软件安装 sudo apt-get update 服务端nfs-kernel-server 客户端nfs-common sudo apt-get install -y nfs-kernel-server nfs-common创建NFS共享目录 sudo mkdir -p /nfssudo chown -R nobody:nogroup /nfs sudo chmod -R 777 /nfs配置文件 sudo vim /etc/exports# [共享目录…

08.oracle的表

oracle的表 一、与表相关的几个概念二、表的几种类型包括&#xff1a;三、对表的一些基本操作 一、与表相关的几个概念 高水位线&#xff08;High Water Mark&#xff09;&#xff1a;是指表中已经被使用的空间的最高位置。当表中的数据被删除或更新时&#xff0c;高水位线不会…

Go使用命令行输出二维码

引言 二维码&#xff08;QR code&#xff09;是一种矩阵条码的标准&#xff0c;广泛应用于商业、移动支付和数据存储等领域。在开发过程中&#xff0c;我们可能需要在命令行中显示二维码&#xff0c;这可以帮助我们快速生成和分享二维码信息。本文将介绍如何使用Go语言生成二维…

【SpringBoot】SpringBoot自动配置底层源码解析

概述 EnableAutoConfiguration源码解析SpringBoot常用条件注解源码解析SpringBoot之Mybatis自动配置源码解析SpringBoot之AOP自动配置源码解析SpringBoot Jar包启动过程源码解析 DeferredImportSelector接口 DeferredImportSelector和ImportSelector的区别在于&#xff1a; …

Ubuntu2204 搭建TFTP 服务

安装软件 sudo apt-get install tftp-hpa tftpd-hpa xinetd配置服务 配置tftp sudo vim /etc/xinetd.d/tftp 填入以下参数&#xff0c;/home/tftp 换成自己的共享目录 server tftp {socket_type dgramprotocol udp wait yes user rootserver /usr/sbin/in.tftpdserver_…

windows远程桌面登录ubuntu,黑屏闪退,

首先需要安装xrdp以后才能远程登录&#xff0c;安装命令 sudo apt-get install xrdp 这里需要注意一个问题&#xff0c;如果在ubuntu 本地登录的情况下&#xff0c;windows远程登录会出现黑屏甚至闪退的问题。原因是本地登录和远程登陆是互斥的&#xff0c;本地登录了就不能远…

Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks - 翻译学习

知识密集型NLP任务的检索增强生成 - 论文学习 文章目录 Abstract1 Introduction2 Methods2.1 Models2.2 Retriever: DPR2.3 Generator: BART2.4 Training2.5 Decoding 3 Experiments3.1 Open-domain Question Answering3.2 Abstractive Question Answering3.3 Jeopardy Questio…

java内部类初探

引言 内部类是定义在另一个类中的类&#xff0c;它曾经对于简洁地实现回调非常重要&#xff0c;不过今天lambda表达式在这方面可以做的更好。但内部类对于某些结构的构建还是很有用的。 普通内部类 常规的内部类有以下两个特点&#xff1a; 1.内部类可以对除了自己的外部类以…