Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。因此,下面列出了 Spring boot 的一些主要功能。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“启动器”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第三方库。
  • 提供可用于生产的功能,例如健康检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息系统允许您在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义和进一步处理主题(主题可能是类别)的软件。应用程序可以连接到此系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您的个人博客上的任何事件,也可以是触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用来自 Kafka 主题的消息并使用 Spring Boot 将它们显示在控制台中,其中Kafka 是先决条件。 

例子:

先决条件:确保您已经在本地机器上安装了 Apache Kafka,因此您应该知道如何在 Windows 上安装和运行 Apache Kafka?

步骤 1:转到此链接并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

步骤2:创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

  • Java

// Java Program to Illustrate Kafka Configuration

  

package com.amiya.kafka.apachekafkaconsumer.config;

  

// Importing required classes

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

  

// Annotations

@EnableKafka

@Configuration

  

// Class

public class KafkaConfig {

  

    @Bean

    public ConsumerFactory<String, String> consumerFactory()

    {

  

        // Creating a Map of string-object pairs

        Map<String, Object> config = new HashMap<>();

  

        // Adding the Configuration

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                   "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG,

                   "group_id");

        config.put(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

        config.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

  

        return new DefaultKafkaConsumerFactory<>(config);

    }

  

    // Creating a Listener

    public ConcurrentKafkaListenerContainerFactory

    concurrentKafkaListenerContainerFactory()

    {

        ConcurrentKafkaListenerContainerFactory<

            String, String> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        return factory;

    }

}

步骤 3:创建名为KafkaConsumer的消费者文件

  • Java

// Java Program to Illustrate Kafka Consumer

  

package com.amiya.kafka.apachekafkaconsumer.consumer;

  

// Importing required classes

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

  

@Component

  

// Class

public class KafkaConsumer {

  

    @KafkaListener(topics = "NewTopic",

                   groupId = "group_id")

  

    // Method

    public void

    consume(String message)

    {

        // Print statement

        System.out.println("message = " + message);

    }

}

步骤 4:现在我们必须做以下事情才能使用 Spring Boot 从 Kafka 主题消费消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

类似地,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送消息

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

步骤 5:现在运行你的 Spring Boot 应用程序。确保已在application.properties文件中更改了端口号

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序

输出:在输出中,您可以看到当您从 Kafka 主题发送消息时,它会实时显示在控制台上。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/67173.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

金融项目实战 04|JMeter实现自动化脚本接口测试及持续集成

目录 一、⾃动化测试理论 二、自动化脚本 1、添加断言 1️⃣注册、登录 2️⃣认证、充值、开户、投资 2、可重复执行&#xff1a;清除测试数据脚本按指定顺序执行 1️⃣如何可以做到可重复执⾏&#xff1f; 2️⃣清除测试数据&#xff1a;连接数据库setup线程组 ①明确…

linux下中文输入法ibus

在Linux系统中&#xff0c;现在都自带ibus框架&#xff0c;可以选择中文输入法&#xff0c;如果使用IBus输入法时无法输入中文&#xff0c;可以尝试以下几种方法来解决这个问题&#xff1a; 1. 检查环境变量 确保环境变量配置正确。可以在终端中运行以下命令来设置环境变量&a…

【Uniapp-Vue3】@import导入css样式及scss变量用法与static目录

一、import导入css样式 在项目文件中创建一个common文件夹&#xff0c;下面创建一个css文件夹&#xff0c;里面放上style.css文件&#xff0c;编写的是公共样式&#xff0c;我们现在要在App.vue中引入该样式。 在App.vue中引入该样式&#xff0c;这样就会使样式全局生效&#…

大疆机场及无人机上云

最近基于大疆上云api进行二次开发&#xff0c;后面将按照开发步骤对其进行说明&#xff01;

WINFORM - DevExpress -> gridcontrol---->主从模式

1.gridview设置 //不显示分组的面板gridView1.OptionsView.ShowGroupPanel = false;//自动改变行高适应内容 gridView1.OptionsView.RowAutoHeight = true;//允许自动合并单元格 gridView1.OptionsView.AllowCellMerge = true;//如果主从表中,没有找到从表内容也要显示(默认是…

Golang笔记——hashmap

本文详细介绍golang的哈希表的底层实现、扩容机制、插入查询过程以及并发安全性。 文章目录 定义Key无序性Key唯一性Key可比性 基本使用底层实现哈希表实现hmapbucket 数据结构bmap链地址法哈希冲突负载因子 扩容增量扩容等量扩容 查找过程插入过程删除流程非并发安全map 的线程…

【网络】:网络编程套接字

目录 源IP地址和目的IP地址 源MAC地址和目的MAC地址 源端口号和目的端口号 端口号 VS 进程ID TCP协议和UDP协议 网络字节序 字符串IP和整数IP相互转换 查看当前网络的状态 socket编程接口 socket常见API 创建套接字&#xff08;socket&#xff09; 绑定端口号&…

el-descriptions-item使用span占行不生效

需要实现的效果是客户状态单独占满一行 错误代码&#xff1a; <el-descriptions title"基本信息" :column"3"> <el-descriptions-item label"公司电话:">Suzhou</el-descriptions-item><el-descriptions-item label"…

vue城市道路交通流量预测可视化系统

文章结尾部分有CSDN官方提供的学长 联系方式名片 文章结尾部分有CSDN官方提供的学长 联系方式名片 关注B站、收藏、不迷路&#xff01; 项目亮点 编号&#xff1a;R09 &#x1f687; 网站大屏管理三大前端、vuespringbootmysql、前后端分离架构 &#x1f687; 流量预测道路查询…

Elasticsearch:使用 Playground 与你的 PDF 聊天

LLMs作者&#xff1a;来自 Elastic Toms Mura 了解如何将 PDF 文件上传到 Kibana 并使用 Elastic Playground 与它们交互。本博客展示了在 Playground 中与 PDF 聊天的实用示例。 Elasticsearch 8.16 具有一项新功能&#xff0c;可让你将 PDF 文件直接上传到 Kibana 并使用 Pla…

[免费]SpringBoot+Vue新能源汽车充电桩管理系统【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的SpringBootVue新能源汽车充电桩管理系统&#xff0c;分享下哈。 项目视频演示 【免费】SpringBootVue新能源汽车充电桩管理系统 Java毕业设计_哔哩哔哩_bilibili 项目介绍 随着信息化时代的到来&#xff0…

《拉依达的嵌入式\驱动面试宝典》—操作系统篇(二)

《拉依达的嵌入式\驱动面试宝典》—操作系统篇(二) 你好,我是拉依达。 感谢所有阅读关注我的同学支持,目前博客累计阅读 27w,关注1.5w人。其中博客《最全Linux驱动开发全流程详细解析(持续更新)-CSDN博客》已经是 Linux驱动 相关内容搜索的推荐首位,感谢大家支持。 《拉…

ffmpeg7.0 aac转pcm

#pragma once #define __STDC_CONSTANT_MACROS #define _CRT_SECURE_NO_WARNINGSextern "C" { #include "libavcodec/avcodec.h" }//缓冲区大小&#xff08;缓存5帧数据&#xff09; #define AUDIO_INBUF_SIZE 40960 /*name depthu8 8s16 …

【Uniapp-Vue3】pages.json页面路由globalStyle的属性

项目的全局配置在pages.json中。 一、导航栏设置 二、下拉刷新设置 下拉就可以看到设置的样式 三、上拉触底 这个页面中&#xff0c;向下滑动页面到底部就会输出“到底了” 现在将触底距离设置为500 走到半路就会输出“到底了”

openGauss 6.0 LTS 实现高可用性部署

openGauss 6.0 LTS是华为开发的企业级分布式数据库&#xff1a;高性能、高可用性、强扩展性&#xff0c;基于PostgreSQL&#xff0c;支持SQL和JSON数据类型&#xff0c;提供高并发、高吞吐量的处理能力&#xff0c;适合金融、电信、政府等行业使用&#xff0c;6.0版本主要增强了…

对比学习 (Contrastive Learning) 算法详解与PyTorch实现

对比学习 (Contrastive Learning) 算法详解与PyTorch实现 目录 对比学习 (Contrastive Learning) 算法详解与PyTorch实现1. 对比学习 (Contrastive Learning) 算法概述1.1 自监督学习1.2 对比学习的优势2. 对比学习的核心技术2.1 正样本对与负样本对2.2 对比损失函数2.3 数据增…

前端开发:Web前端和HTML

一、解释 1.Web前端开发&#xff1f; Web前端开发是指创建用户在浏览器中直接交互和体验的部分的过程。‌前端开发主要涉及HTML、CSS和JavaScript这三大核心技术。HTML用于构建网页内容的骨架&#xff0c;CSS用于设置和控制网页的外观和布局&#xff0c;而JavaScript则赋予网…

LeetCode 热题 100_腐烂的橘子(52_994_中等_C++)(图;广度优先遍历(队列))

LeetCode 热题 100_腐烂的橘子&#xff08;52_994&#xff09; 题目描述&#xff1a;输入输出样例&#xff1a;题解&#xff1a;解题思路&#xff1a;思路一&#xff08;广度优先遍历&#xff08;队列&#xff09;&#xff09;&#xff1a; 代码实现代码实现&#xff08;思路一…

C#,图论与图算法,输出无向图“欧拉路径”的弗勒里(Fleury Algorithm)算法和源程序

1 欧拉路径 欧拉路径是图中每一条边只访问一次的路径。欧拉回路是在同一顶点上开始和结束的欧拉路径。 这里展示一种输出欧拉路径或回路的算法。 以下是Fleury用于打印欧拉轨迹或循环的算法&#xff08;源&#xff09;。 1、确保图形有0个或2个奇数顶点。2、如果有0个奇数顶…

[文献精汇]使用 LSTM Networks 的均值回归交易策略

Backtrader 策略实例 [Backtrader]实例:均线策略[Backtrader] 实例:MACD策略[Backtrader] 实例:KDJ 策略[Backtrader] 实例:RSI 与 EMA 结合[Backtrader] 实例:SMA自定义数据源[Backtrader] 实例:海龟策略[Backtrader] 实例:网格交易[Backtrader] 实例: 配对交[Backtrader] 机…