Kafka的安装及接入SpringBoot

环境:windows、jdk1.8、springboot2

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

1.概述

        Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

        详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

2.1 配置kafka

        解压下载的文件,修改 config 文件夹下的 zookeeper.properties

        修改 config 文件夹下的 server.properties

        当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

 

2.2 启动 zookeeper

        Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

        可以本地访问看一下:http://localhost:2181/ 

2.3 启动Kafka 

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

        访问路径: http://localhost:9092/ 

2.4 便捷启动脚本

        两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.8.RELEASE</version>
</parent>
<properties><fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

(2)配置类application.yml

        生产者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092producer:retries: 0key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

        消费者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApp {public static void main(String[] args) {SpringApplication.run(KafkaApp.class, args);}
}

3.2 消息生产者

        junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​@Testpublic void sendMsg(){String topic = "spring_test";kafkaTemplate.send(topic,"hello spring boot kafka!");System.out.println("发送成功.");while (true){ //保存加载ioc容器
​}}
}

3.3 消息消费者

        新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​//    以下两种方法都行// 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​@KafkaListener(topics = "spring_test")public void handleMessage(ConsumerRecord<String, String> record) {System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/10521.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言-课程管理系统-大作业

C语言编写课程管理系统 1 需求分析2 需要查的知识点3 数据结构和基础函数功能3.1 课程数据结构3.2 菜单和选择项3.3 从文件中加载课程信息到内存3.4 将内存中的课程信息保存到文件3.5 将输入的字符串格式化为课程信息结构体 4 主要功能函数4.1 录入课程信息函数4.2 浏览课程信息…

2024全新小狐狸AI免授权源码

源码安装说明&#xff1a; 下 载 地 址 &#xff1a; runruncode.com/php/19757.html 1. 在宝塔新建一个站点&#xff0c;选择 PHP 版本为 7.2、7.3 或 7.4。将压缩包上传到站点的根目录&#xff0c;并设置运行目录为 /public。 2. 导入数据库文件&#xff0c;该文件位于 …

Python装饰器带括号和不带括号的理解

装饰器是 Python 中一个强大且灵活的特性&#xff0c;允许用户在不修改原有函数或类定义的基础上&#xff0c;为其增加额外功能。 今天在尝试自定义 Python 装饰器的时候遇到了一个问题&#xff0c;因为以前一直是使用装饰器&#xff0c;基本没有自定义过装饰器&#xff0c;所…

vue3点击添加小狗图片,vue3拆分脚本

我悄悄蒙上你的眼睛 模板和样式 <template><div class"XueXi_Hooks"><img v-for"(dog, index) in dog1List" :src"dog" :key"index" /><button click"addDog1">点我添加狗1</button><hr …

WPF之工具栏菜单栏功能区。

1&#xff0c;菜单栏&#xff0c;工具栏&#xff0c;状态栏。 1.1&#xff0c;Menu中可添加菜单分隔条<Separator></Separator>作为分割线&#xff0c;使用Separator可以通过改变其template来自定义&#xff0c;Separator是无焦点的&#xff0c;如果简单的在MenuIt…

品鉴中的平衡之美:如何欣赏红酒的口感与风格和谐

品鉴云仓酒庄雷盛红酒的过程&#xff0c;是对其口感与风格和谐的追求和欣赏。平衡是红酒品质的重要标志之一&#xff0c;它体现在红酒的色泽、香气、口感和余味等多个方面。通过欣赏红酒的平衡之美&#xff0c;我们可以更好地领略其精妙之处&#xff0c;感受其带来的美妙滋味。…

【Unity Animation 2D】Unity Animation 2D骨骼绑定与动画制作

一、图片格式为png格式&#xff0c;并且角色各部分分离 图片参数设置 需要将Sprite Mode设置为Single&#xff0c;否则图片不能作为一个整体 1、创建骨骼 1.1 旋转Create Bone&#xff0c;点击鼠标左键确定骨骼位置&#xff0c;移动鼠标再次点击鼠标左键确定骨骼&#xff0c…

大数据面试题第一期*4

题1、HDFS存储机制 &#xff08;1&#xff09;客户端向namenode请求上传文件 &#xff0c;namenode检查目标文件是否已存在 &#xff0c;父目录是否存在。 &#xff08;2&#xff09;namenode返回是否可以上传。 &#xff08;3&#xff09;客户端请求第一个 block上传到哪几个d…

嵌入式C语言高级教程:实现基于STM32的智能健康监测手环

智能健康监测手环能够实时监控用户的生理参数&#xff0c;如心率、体温和活动量&#xff0c;对于健康管理和疾病预防非常有帮助。本教程将指导您如何在STM32微控制器上实现一个基本的智能健康监测手环。 一、开发环境准备 硬件要求 微控制器&#xff1a;STM32L476RG&#xf…

RS3236-3.3YUTDN4功能和参数介绍及PDF资料

RS3236-3.3YUTDN4功能和参数介绍及PDF资料-公司新闻-配芯易-深圳市亚泰盈科电子有限公司 品牌: RUNIC(润石) 封装: XDFN-4-EP(1x1) 描述: 带过温保护 输出类型: 固定 最大输入电压: 7.5V 输出电压: 3.3V 最大输出电流: 500mA RS3236-3.3YUTDN4 是一款低压差线性稳压器&#x…

PX4FMU和PX4IO最底层启动过程分析(下)

PX4FMU和PX4IO最底层启动过程分析&#xff08;下&#xff09; PX4FMU的系统启动函数为nash_main(int argc,char *argv[]) PX4IO的系统启动函数为nash_start(int argc,char *argv[]) PX4FMU启动函数nash_main(int argc,char *argv[]) 首先分析一下nash_main(int argc,char *a…

高效视频剪辑:视频批量调色,如何利用色调调整提升效率

在视频剪辑的后期处理中&#xff0c;调色是一个至关重要的环节。它不仅能够改变视频的整体氛围和风格&#xff0c;还能够突出视频的重点&#xff0c;增强观众的视觉体验。然而&#xff0c;对于大量的视频素材进行逐个调色处理&#xff0c;无疑会耗费大量的时间和精力。我们可以…

软件安装及YOLOv8环境配置及验证

先附上本章中所用到的软件及环境安装包&#xff0c;还有YOLOv8各任务权重&#xff1a; 软件及环境配置链接&#xff1a;https://pan.baidu.com/s/1-n2HJybicA6vW1YXfGRtcA 提取码&#xff1a;6vh8 YOLOv8各权重&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1ApYUrJ_s…

C++相关概念和易错语法(12)(迭代器、string容量调整)

1.迭代器&#xff08;以string为例&#xff09; &#xff08;1&#xff09;基本理解&#xff1a;在我们刚接触迭代器的时候&#xff0c;我们可以将迭代器理解为改造过的“指针”&#xff0c;这是一个新的类型&#xff0c;指向对应容器中的各个元素。我们可以像指针那样对迭代器…

Lombok介绍、使用方法和安装

目录 1 Lombok背景介绍 2 Lombok使用方法 2.1 Data 2.2 Getter/Setter 2.3 NonNull 2.4 Cleanup 2.5 EqualsAndHashCode 2.6 ToString 2.7 NoArgsConstructor, RequiredArgsConstructor and AllArgsConstructor 3 Lombok工作原理分析 4. Lombok的优缺点 5. 总结 1 …

Idea入门:一分钟创建一个Java工程

一&#xff0c;新建一个Java工程 1&#xff0c;启动Idea后&#xff0c;选择 [New Project] 2&#xff0c;完善工程信息 填写工程名称&#xff0c;根据实际用途取有意义的英文名称选择Java语言&#xff0c;可以看到还支持Kotlin、Javascript等语言选择包管理和项目构建工具Mav…

LVS的三种工作模式---(DR/TUN/NAT)

目录 一、NAT模式&#xff08;LVS-NAT&#xff09; 二、IP隧道模式&#xff08;LVS-TUN&#xff09; 三、DR模型--直接路由模式&#xff08;LVS-DR&#xff09; LVS/DR模式ARP抑制 原因&#xff1a; LVS的DR工作模式及配置&#xff1a; LVS的NAT工作模式及配置&#xff1…

PyQt6--Python桌面开发(7.QTextEdit多行富文本框控件)

QTextEdit多行富文本框控件 保存文件到本地QLine多行文本框.ui import sys import time from PyQt6.QtGui import QValidator,QIntValidator from PyQt6.QtWidgets import QApplication,QLabel,QLineEdit,QTextEdit from PyQt6 import uic,QtGuiif __name__ __main__:appQApp…

二叉树进阶 --- 上

目录 1. 二叉搜索树的概念及结构 1.1. 二叉搜索树的概念 1.2. 二叉搜索树的结构样例 2. 二叉搜索树的实现 2.1. insert 的非递归实现 2.2. find 的非递归实现 2.3. erase 的非递归实现 2.3.1. 第一种情况&#xff1a;所删除的节点的左孩子为空 2.3.1.1. 错误的代码 2…

基本QinQ

拓扑图 配置 开启LLDP功能&#xff0c;查看是否能通过QinQ隧道透传 sysname AR1 # lldp enable # interface GigabitEthernet0/0/0.10dot1q termination vid 10ip address 12.1.1.1 255.255.255.0 arp broadcast enable # sysname AR2 # lldp enable # interface GigabitE…