Kafka 3.x.x 入门到精通(07)——Java应用场景——SpringBoot集成

Kafka 3.x.x 入门到精通(07)——Java应用场景——SpringBoot集成

  • 4. Java应用场景——SpringBoot集成
    • 4.1 创建SpringBoot项目
      • 4.1.1 创建SpringBoot项目
      • 4.1.2 修改pom.xml文件
      • 4.1.3 在resources中增加application.yml文件
    • 4.2 编写功能代码
      • 4.2.1 创建配置类:SpringBootKafkaConfig
      • 4.2.2 创建生产者控制器:KafkaProducerController
      • 4.2.3 创建消费者:KafkaDataConsumer
    • 4.3 集成测试
      • 4.3.1 启动ZooKeeper
      • 4.3.2 启动Kafka
      • 4.3.3 启动应用程序
      • 4.3.4 生产数据测试

在这里插入图片描述

在这里插入图片描述

本文档参看的视频是:

  • 尚硅谷Kafka教程,2024新版kafka视频,零基础入门到实战
  • 黑马程序员Kafka视频教程,大数据企业级消息队列kafka入门到精通
  • 小朋友也可以懂的Kafka入门教程,还不快来学

本文档参看的文档是:

  • 尚硅谷官方文档,并在基础上修改 完善!非常感谢尚硅谷团队!!!!

在这之前大家可以看我以下几篇文章,循序渐进:

❤️Kafka 3.x.x 入门到精通(01)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(02)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(03)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(04)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(05)——对标尚硅谷Kafka教程

❤️Kafka 3.x.x 入门到精通(06)——对标尚硅谷Kafka教程

在这里插入图片描述

4. Java应用场景——SpringBoot集成

Spring Boot帮助您创建可以运行的、独立的、生产级的基于Spring的应用程序。您可以使用Spring Boot创建Java应用程序,这些应用程序可以通过使用java-jar或更传统的war部署启动。
我们的目标是:

  • 为所有Spring开发提供从根本上更快、广泛访问的入门体验。
  • 开箱即用,但随着要求开始偏离默认值,请迅速离开。
  • 提供大型项目(如嵌入式服务器、安全性、指标、健康检查和外部化配置)常见的一系列非功能性功能。
  • 绝对没有代码生成(当不针对原生图像时),也不需要XML配置。

在这里插入图片描述

在这里插入图片描述

4.1 创建SpringBoot项目

4.1.1 创建SpringBoot项目

在这里插入图片描述

在这里插入图片描述

4.1.2 修改pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu</groupId><artifactId>springboot-kafka</artifactId><version>0.0.1</version><name>springboot-kafka</name><description>Kafka project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-json</artifactId><version>5.8.11</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-db</artifactId><version>5.8.11</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

在这里插入图片描述

4.1.3 在resources中增加application.yml文件

spring:kafka:bootstrap-servers: localhost:9092producer:acks: allbatch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerretries: 0consumer:group-id: test#消费者组#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,从头开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliestenable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交的频率key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 2properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡max.poll.interval.ms: 300000#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#max.partition.fetch.bytes=1048576  #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediatemissing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batchconcurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲template:default-topic: "test"
server:port: 9999

4.2 编写功能代码

4.2.1 创建配置类:SpringBootKafkaConfig

package com.atguigu.springkafka.config;public class SpringBootKafkaConfig {public static final String TOPIC_TEST = "test";public static final String GROUP_ID = "test";
}

在这里插入图片描述

4.2.2 创建生产者控制器:KafkaProducerController

package com.atguigu.springkafka.controller;import com.atguigu.springkafka.config.SpringBootKafkaConfig;
import lombok.extern.slf4j.Slf4j;
import cn.hutool.json.JSONUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.*;import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@ResponseBody@PostMapping(value = "/produce", produces = "application/json")public String produce(@RequestBody Object obj) {try {String obj2String = JSONUtil.toJsonStr(obj);kafkaTemplate.send(SpringBootKafkaConfig.TOPIC_TEST, obj2String);return "success";} catch (Exception e) {e.getMessage();}return "success";}
}

在这里插入图片描述

4.2.3 创建消费者:KafkaDataConsumer

package com.atguigu.springkafka.component;import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import com.atguigu.springkafka.config.SpringBootKafkaConfig;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Optional;@Component
@Slf4j
public class KafkaDataConsumer {@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("data"));//ack.acknowledge();}}
}

在这里插入图片描述

4.3 集成测试

4.3.1 启动ZooKeeper

在这里插入图片描述

在这里插入图片描述

4.3.2 启动Kafka

4.3.3 启动应用程序

在这里插入图片描述

在这里插入图片描述

4.3.4 生产数据测试

可以采用任何的工具进行测试,我们这里采用postman发送POST数据

在这里插入图片描述

消费者消费数据

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器人-轨迹规划

旋转矩阵 旋转矩阵--R--一个3*3的矩阵&#xff0c;其每列的值时B坐标系在A坐标系上的投影值。 代表B坐标系相对于A坐标系的姿态。 旋转矩阵的转置矩阵 其实A相对于B的旋转矩阵就相当于把B的列放到行上就行。 视频 &#xff08;将矩阵的行列互换得到的新矩阵称为转置矩阵。&…

SQLite尽如此轻量

众所周知&#xff0c;SQLite是个轻量级数据库&#xff0c;适用于中小型服务应用等&#xff0c;在我真正使用的时候才发现&#xff0c;它虽然轻量&#xff0c;但不知道它却如此轻量。 下载 官网&#xff1a; SQLite Download Page 安装 1、将下载好的两个压缩包同时解压到一个…

【PG-2】PostgreSQL存储管理器

2. PostgreSQL存储管理器 src/backend/storage (base) torrestorresの机革:~/codes/postgresql-16.2/src/backend/storage$ ls Makefile buffer file freespace ipc large_object lmgr meson.build objfiles.txt page smgr sync存储管理器—smgr 通用存储管理器 …

航拍图像拼接 | 使用C++实现的无人机航拍图像拼接

项目应用场景 面向无人机航拍图像拼接场景&#xff0c;项目使用 C 实现&#xff0c;使用 harris 角点查找特征点 非极大值抑制&#xff0c;由于航拍图像没有严重的尺度旋转变化&#xff0c;使用了 berief 描述子&#xff0c;然后使用 RANSAC 求 H&#xff0c;最后进行图像拼接…

linux 中 make 和 gmake的关系

1. 关系 gmake特指GNU make。 make是指系统默认的make实现; 在大多数Linux发行版中&#xff0c;make就是GNU make&#xff0c;但是在其他unix中&#xff0c;gmake可以指代make的某些其他实现&#xff0c;例如BSD make或各种商业unix的make实现。 gmake是GNU Make的缩写。 Linux…

【算法一则】【贪心】数组中的数可以拼装成的最大数

题目 给定一组非负整数 nums&#xff0c;重新排列每个数的顺序&#xff08;每个数不可拆分&#xff09;使之组成一个最大的整数。 注意&#xff1a;输出结果可能非常大&#xff0c;所以你需要返回一个字符串而不是整数。 示例 1&#xff1a; 输入&#xff1a;nums [10,2] …

【UnityRPG游戏制作】RPG项目的背包系统商城系统和BOSS大界面

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

二分查找-在排序数组中查找元素的第一个和最后一个位置

给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返回 [-1, -1]。 你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。 示例 输入&#xf…

程序链接步骤2:重定位

一、链接步骤 链接步骤1“符号解析”&#xff1a; 将符号引用和符号定义建立关联后&#xff1b; 链接步骤2“重定位”&#xff1a; 将引用符号的地址“重定位”为相关联的符号定义的地址。 二、原文链接&#xff1a;https://www.jianshu.com/p/7d13ec4735ba 符号解析完成后&…

ZooKeeper 搭建详细步骤之二(伪集群模式)

ZooKeeper 搭建详细步骤之一&#xff08;单机模式&#xff09; ZooKeeper 及相关概念简介 伪集群搭建 ZooKeeper 伪集群是指在一个单一的物理或虚拟机环境中模拟出一个由多个 ZooKeeper 节点构成的集群。尽管这些节点实际上运行在同一台机器上&#xff0c;但它们通过配置不同的…

字节秋招高频算法汇总(高级篇)

更多大厂面试内容可见 -> http://11come.cn 字节秋招高频算法汇总 接下来讲一下 字节秋招 中的高频算法题&#xff0c;分为三个部分&#xff1a; 基础篇 、 中级篇 、 进阶篇 目的就是为了应对秋招中的算法题&#xff0c;其实过算法题的诀窍就在于 理解的基础上 背会 看…

与Apollo共创生态:我和Apollo七周年大会的心路历程

一、写在前面 前几天观看了Apollo七周年大会&#xff0c;给我带来了超多的惊喜&#xff0c;博主想将这份惊喜分享给大家&#xff01; 二、Apollo开放平台 Apollo开放平台秉承其核心理念——开放能力、共享资源、加速创新、持续共赢&#xff0c;致力于推动自动驾驶技术的革新…

《HCIP-openEuler实验指导手册》1.2Apache主页面配置

一、配置服务器监听IP及端口 注释主配置文件“监听IP及端口”部分 cd /etc/httpd/conf cp httpd.conf httpd.conf.bak vim httpd.conf可以在普通模式下搜索Listen关键字 :/Listen按n键继续向后搜索 在/etc/httpd/conf.d中新建子配置文件port.conf&#xff1a; touch /etc…

观成科技:蔓灵花组织加密通信研究分析总结

1.概述 蔓灵花&#xff0c;又名"Bitter"&#xff0c;常对南亚周边及孟加拉湾海域的相关国家发起网络攻击&#xff0c;主要针对巴基斯坦和中国两国。其攻击目标主要包括政府部门、核工业、能源、国防、军工、船舶工业、航空工业以及海运等行业&#xff0c;其主要意图…

后端每日一题 2:DNS 解析过程

本文首发于公众号&#xff1a;腐烂的橘子 本文梗概&#xff1a; DNS 是什么&#xff0c;有什么作用一条 DNS 记录是什么样的DNS 域名解析原理DNS 服务器如何抵御攻击 DNS 是什么&#xff0c;有什么作用 DNS&#xff08;Domain Name System&#xff09;是一种应用层协议&…

使用和配置:超秀的 MySQL 客户端工具 MyCli

1.安装一个更人性化的一个mysql客户端、终端: sudo apt install mycli 登录方式 mycli -uroot password 2. 功能&#xff1a;界面更好看&#xff0c;且支持自动补全&#xff0c;按tab可以补全 3.mycli使用帮助说明文档 mycli --help用法: mycli [OPTIONS] [DATABASE]例如:…

西门子程序专业备份软件BUDdy for S7和使用说明

西门子程序专业备份软件BUDdy for S7和使用说明

虹科Pico汽车示波器 | 免拆诊断案例 | 2006 款林肯领航员车发动机怠速抖动

故障现象 一辆2006款林肯领航员车&#xff0c;搭载5.4 L发动机&#xff0c;累计行驶里程约为26万km。该车因发动机怠速抖动故障进厂维修&#xff0c;维修人员更换了火花塞、点火线圈及凸轮轴位置传感器&#xff0c;清洗了积炭和喷油器&#xff0c;故障依旧&#xff0c;于是向笔…

SpringBoot学习之Redis下载安装启动【Windows版本】(三十六)

一、下载Redis for Windows Redis 官方网站没有提供 Windows 版的安装包,但可以通过 GitHub 来下载安装包,下载地址:https://github.com/tporadowski/redis/releases 1、网站提供了安装包和免安装版本,这里我们直接选择下面的免安装版本 2、下载后的压缩包解压以后,如下…

C#基础之结构体

结构体 文章目录 1、概念2、基本语法3、示例4、结构体的使用5、访问修饰符6、结构体的构造函数思考1 描述矩形信息思考2 职业名字释放了技能思考3 小怪兽思考4 多个小怪兽思考5 奥特曼打小怪兽 1、概念 结构体是一种一定义变量类型 它是数据和函数的集合&#xff0c;可以在结…