Spring Boot教程之五十六:用 Apache Kafka 消费 JSON 消息

Spring Boot | 如何使用 Apache Kafka 消费 JSON 消息

Apache Kafka 是一个流处理系统,可让您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何使用 Apache Kafka 在 Spring Boot 应用程序的控制台上发布 JSON 消息。

为了了解如何创建 Spring Boot 项目,请参阅本文。

工作步骤 

步骤 1:

转到Spring 初始化程序并创建具有以下依赖项的启动项目: 
Spring for Apache Kafka

步骤 2:

在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并重写toString方法以查看 JSON 格式的消息。以下是学生类的实现:

  • 学生模型

// Java program to implement a

// student class

 

// Creating a student class

public class Student {

 

    // Data members of the class

    int id;

    String firstName;

    String lastName;

 

    // Constructor of the student

    // Class

    public Student()

    {

    }

 

    // Parameterized constructor of

    // the student class

    public Student(int id, String firstName,

                   String lastName)

    {

        this.id = id;

        this.firstName = firstName;

        this.lastName = lastName;

    }

 

    @Override

    public String toString()

    {

        return "Student{"

            + "id = " + id

            + ", firstName = '" + firstName + "'"

            + ", lastName = '" + lastName + "'"

            + "}";

    }

}

步骤 3:

创建一个新的类Config并添加注释@Configuration@EnableKafka。现在使用 Student 类对象创建 Bean ConsumerFactoryConcurrentKafkaListenerContainerFactory 。

  • 配置类

@EnableKafka

@Configuration

public class Config {

 

    // Function to establish a connection

    // between Spring application

    // and Kafka server

    @Bean

    public ConsumerFactory<String, Student>

    studentConsumer()

    {

 

        // HashMap to store the configurations

        Map<String, Object> map

            = new HashMap<>();

 

        // put the host IP in the map

        map.put(ConsumerConfig

                    .BOOTSTRAP_SERVERS_CONFIG,

                "127.0.0.1:9092");

 

        // put the group ID of consumer in the map

        map.put(ConsumerConfig

                    .GROUP_ID_CONFIG,

                "id");

        map.put(ConsumerConfig

                    .KEY_DESERIALIZER_CLASS_CONFIG,

                StringDeserializer.class);

        map.put(ConsumerConfig

                    .VALUE_DESERIALIZER_CLASS_CONFIG,

                JsonDeserializer.class);

 

        // return message in JSON formate

        return new DefaultKafkaConsumerFactory<>(

            map, new StringDeserializer(),

            new JsonDeserializer<>(Student.class));

    }

 

    @Bean

    public ConcurrentKafkaListenerContainerFactory<String,

                                                   Student>

    studentListner()

    {

        ConcurrentKafkaListenerContainerFactory<String,

                                                Student>

            factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(studentConsumer());

        return factory;

    }

}

步骤 4:

创建一个带有@Service注释的KafkaService类。此类将包含用于在控制台上发布消息的侦听器方法。 

  • KafkaService 类

@Service

public class KafkaService {

 

    // Annotation required to listen

    // the message from Kafka server

    @KafkaListener(topics = "JsonTopic",

                   groupId = "id", containerFactory

                                   = "studentListner")

    public void

    publish(Student student)

    {

        System.out.println("New Entry: "

                           + student);

    }

}

步骤 5:

启动 zookeeper 和 Kafka 服务器。现在我们需要创建一个名为JsonTopic的新主题。为此,打开一个新的命令提示符窗口并将目录更改为 Kafka 目录。

步骤6:

现在使用下面给出的命令创建一个新主题: 

bin/Kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 mac 和 linux
.\bin\windows\Kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name // 用于 windows 

步骤 7:

现在运行 Kafka 生产者控制台,使用以下命令: 

bin/Kafka-console-producer.sh –broker-list localhost:9092 –topic Kafka_Example // 适用于 mac 和 linux
.\bin\windows\Kafka-console-producer.bat –broker-list localhost:9092 –topic Kafka_Example // 适用于 windows 

步骤 8:

运行应用程序并在 Kafka 生产器上输入消息并按回车键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/67214.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python+playwright自动化测试(一):安装及简单使用,截图录屏

目录 基本使用 浏览器调用 启用浏览器 创建窗口对象 访问URL 页面的刷新、返回、前进 关闭 截图、录屏、保存pdf 截图 录屏 保存为pdf 设置窗口大小 调试模式 手机模式及new_context的更多参数 手机模式 new_context的其他参数 设置语言和时区 设置和修改位置…

初识C++(二)

六、引用 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空间&#xff0c;它和它引用的变量共用同一块内存空间。 通俗地讲&#xff0c;可以理解为一个人能够拥有多个称呼&#xff0c;这些所有的称呼都是表示这一…

【RedisStack】Linux安装指南

【RedisStack】Linux安装指南.md 前言下载解压创建启动文件设置密码把密码设置到环境变量启动/停止相关命令测试&验证官网资料参考资料 前言 Redis Stack是使用Redis的最佳起点。我们将我们必须提供的最好的技术捆绑在一起&#xff0c;形成一个易于使用的软件包。Redis St…

达梦8-DMSQL程序设计学习笔记1-DMSQL程序简介

1、DMSQL程序简介 DMSQL程序是达梦数据库对标准SQL语言的扩展&#xff0c;是一种过程化SQL语言。在DMSQL程序中&#xff0c;包括一整套数据类型、条件结构、循环结构和异常处理结构等&#xff0c;DMSQL程序中可以执行SQL语句&#xff0c;SQL语句中也可以使用DMSQL函数。 DMSQ…

STM32 FreeRTOS 基础知识

多任务处理 内核是操作系统的核心组件。诸如 Linux 这样的操作系统采用的内核&#xff0c; 看似允许用户同时访问计算机。很明显&#xff0c;多个用户可以同时执行多个程序。 每个执行程序都是受操作系统控制的任务&#xff08;或线程&#xff09;。如果一个操作系统能够以这…

T-SQL编程

目录 1、T-SQL的元素 1.1 标识符 1. 常规标识符 2. 分隔标识符 1.2 变量 1. 全局变量 2. 局部变量 1.3 运算符 1. 算数运算符 2. 赋值运算符 3. 位运算符 4. 比较运算符 5. 逻辑运算符 6. 字符串连接运算符 7. 一元运算符 8. 运算符的优先级和结合性 1.4 批处…

js中的Object.defineProperty()详解

文章目录 一、Object.defineProperty()二、descriptor属性描述符2.1、数据描述符2.2、访问器描述符2.3、descriptor属性2.3.1、value2.3.2、writable2.3.3、enumerable &#xff08;可遍历性&#xff09;2.3.4、configurable &#xff08;可配置性&#xff09; 三、注意事项 一…

【搭建JavaEE】(2)Tomcat安装配置和第一个JavaEE程序

Tomcat–容器(Container) 下载 Apache Tomcat - Welcome! 下载完成 请求/响应 结构 测试 查看Jdk版本 改端口号localhost8080–>8099 学学人家以后牛逼了可以用自己名字当文件夹名 配置端口8099 找到server文件 用记事本打开 再打开另一个logging文件 ”乱码解决“步骤&…

centos7.6 安装nginx 1.21.3与配置ssl

1 安装依赖 yum -y install gcc zlib zlib-devel pcre-devel openssl openssl-devel2 下载Nginx wget http://nginx.org/download/nginx-1.21.3.tar.gz3 安装目录 mkdir -p /data/apps/nginx4 安装 4.1 创建用户 创建用户nginx使用的nginx用户。 #添加www组 # groupa…

高级软件工程-复习

高级软件工程复习 坐标国科大&#xff0c;下面是老师说的考试重点。 Ruby编程语言的一些特征需要了解要能读得懂Ruby程序Git的基本命令操作知道Rails的MVC工作机理需要清楚&#xff0c;Model, Controller, View各司什么职责明白BDD的User Story需要会写&#xff0c;SMART要求能…

TrollFools 2.10-22 插件注入工具 官方版

《TrollFools巨魔设备专用插件注入工具》这是一款专为巨魔设备打造的插件注入神器&#xff0c;功能强大且操作便捷。它能够轻松地将插件注入通过AppStore商店下载的任意APP中&#xff0c;同时也能随时卸载&#xff0c;丝毫不影响APP的正常使用。注入后的APP仍可正常更新&#x…

30分钟内搭建一个全能轻量级springboot 3.4 + 脚手架 <1> 5分钟快速创建一个springboot web项目

快速导航 <1> 5分钟快速创建一个springboot web项目 <2> 5分钟集成好最新版本的开源swagger ui&#xff0c;并使用ui操作调用接口 <3> 5分钟集成好druid并使用druid自带监控工具监控sql请求 <4> 5分钟集成好mybatisplus并使用mybatisplus generator自…

arcgis中生成格网矢量带高度

效果 1、数据准备 (1)矢量边界(miain.shp) (2)DEM(用于提取格网标高) (3)DSM(用于提取格网最高点) 2、根据矢量范围生成格网 模板范围选择矢量边界,像元宽度和高度根据坐标系来输入,我这边是4326的,所以输入的是弧度,输出格网矢量gewang.shp 3、分区统计 …

海豚调度DolphinScheduler-3.1.9配置windows本地开发环境

源代码下载地址https://dolphinscheduler.apache.org/zh-cn/docs/3.1.9 1.Zookeeper安装与使用 如图下载解压zookeeper安装包&#xff0c;并创建data和log目录 下载地址 https://archive.apache.org/dist/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz 进入…

P1图文解析:初识算法和数据结构

文章目录 前言1、算法例子1.1、查字典&#xff08;二分查找算法&#xff09;1.2、整理扑克&#xff08;插入排序算法&#xff09;1.3、货币找零&#xff08;贪心算法&#xff09; 2、算法与数据结构2.1、算法定义2.2、数据结构定义2.3、数据结构与算法的关系2.4、独立于编程语言…

校园跑腿小程序---轮播图,导航栏开发

hello hello~ &#xff0c;这里是 code袁~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生…

UE材质节点Fresnel

Fresnel节点 ExponentIn 控制边缘透明度 BaseReflectFractionIn 控制中心透明度

浅谈云计算07 | 云安全机制

浅谈云计算安全机制&#xff1a;全方位守护云端世界 一、引言二、加密技术&#xff1a;数据的隐形护盾三、散列机制&#xff1a;数据完整性的忠诚卫士四、数字签名&#xff1a;数据来源与真伪的鉴定专家五、公钥基础设施&#xff08;PKI&#xff09;&#xff1a;信任的基石六、…

Notepad++上NppFTP插件的安装和使用教程

一、NppFTP插件下载 图示是已经安装好了插件。 在搜索框里面搜NppFTP&#xff0c;一般情况下&#xff0c;自带的下载地址容易下载失败。这里准备了一个下载连接&#xff1a;Release v0.29.10 ashkulz/NppFTP GitHub 这里我下载的是x86版本 下载好后在nodepad的插件里面选择打…

高级运维:源码编译安装httpd 2.4,提供系统服务管理脚本并测试

1.下载httpd 2.4 源码 wget https://archive.apache.org/dist/httpd/httpd-2.4.54.tar.gz 2.解压下载压缩包 tar -zxvf httpd-2.4.54.tar.gz cd httpd-2.4.54 3.安装httpd需要的依赖包 sudo yum groupinstall "Development Tools" -y sudo yum install gcc glibc ap…