Spring Boot教程之五十七:在 Apache Kafka 上发布 JSON 消息

Spring Boot | 如何在 Apache Kafka 上发布 JSON 消息

Apache Kafka是一个发布-订阅消息系统。消息队列允许您在进程、应用程序和服务器之间发送消息。在本文中,我们将了解如何在 Spring Boot 应用程序中向 Apache Kafka 发送 JSON 消息。

为了了解如何创建 Spring Boot 项目,请参阅本文。

JSON 的全称是 JavaScript Object Notation。JSON 是一种用于数据交换的轻量级数据格式,人类可以轻松读取和编写,机器也可以轻松解析和生成。虽然它源自 JavaScript 的一个子集,但它与语言无关。它是一种完全独立于语言的文本格式。可以按照以下步骤将 JSON 消息发布到 Apache Kafka:

  1. 转到spring initializr并创建具有以下依赖项的启动项目:
    • Spring Web
    • Spring for Apache Kafka
  2. 在 IDE 中打开项目并同步依赖项。在本文中,我们将创建一个学生模型,我们将在其中发布学生详细信息。因此,创建一个模型类Student。添加数据成员并创建构造函数并创建 getter 和 setter。以下是学生类的实现:

// Java program to implement a

// student class

  

// Creating a student class

public class Student {

  

    // Data members of the

    // student class

    int id;

    String firstName;

    String lastName;

  

    // Constructor of the student

    // class

    public Student(int id, String firstName,

                   String lastName)

    {

        this.id = id;

        this.firstName = firstName;

        this.lastName = lastName;

    }

  

    // Implementing the getters

    // and setters

    public int getId()

    {

        return id;

    }

  

    public void setId(int id)

    {

        this.id = id;

    }

  

    public String getFirstName()

    {

        return firstName;

    }

  

    public void setFirstName(String firstName)

    {

        this.firstName = firstName;

    }

  

    public String getLastName()

    {

        return lastName;

    }

  

    public void setLastName(String lastName)

    {

        this.lastName = lastName;

    }

}

 3. 现在,创建一个带有注释@RestController的新类Controller。创建一个GET API并使用参数作为字符串和模型类对象初始化KafkaTemplate。以下是控制器的实现:

// Java program to implement a

// controller

  

@RestController

@RequestMapping("gfg")

public class UserResource {

  

    @Autowired

    private KafkaTemplate<String, Student>

        kafkaTemplate;

  

    private static final String TOPIC

        = "StudentExample";

  

    @GetMapping("/publish/{id}/"

                + "{firstName}/{lastName}")

  

    public String post(

        @PathVariable("id") final int id,

        @PathVariable("firstName") final

            String firstName,

        @PathVariable("lastName") final

            String lastName)

    {

  

        kafkaTemplate.send(

            TOPIC,

            new Student(

                id, firstName,

                lastName));

  

        return "Published successfully";

    }

}

4. 创建一个带有注释@Configuration的StudentConfig类。在这个类中,我们将序列化模型类的对象。

// Java program to serialize the

// object of the model class

  

@Configuration

public class StudentConfig {

  

    @Bean

    public ProducerFactory<String, Student>

    producerFactory()

    {

        // Create a map of a string

        // and object

        Map<String, Object> config

            = new HashMap<>();

  

        config.put(

            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

            "127.0.0.1:9092");

  

        config.put(

            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

            StringSerializer.class);

  

        config.put(

            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

            JsonSerializer.class);

  

        return new DefaultKafkaProducerFactory<>(config);

    }

  

    @Bean

    public KafkaTemplate<String, Student>

    kafkaTemplate()

    {

        return new KafkaTemplate<>(

            producerFactory());

    }

}

5. 现在,启动 zookeeper 和 Kafka 服务器。我们需要创建一个名为StudentExample 的新主题。为此,打开一个新的命令提示符窗口并将目录更改为 Kafka 文件夹。

6. 现在,使用以下命令创建一个新主题:

对于 Mac 和 Linux:bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name

对于 Windows: .\bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic topic_name

7.现在要实时查看 Kafka 服务器上的消息,请使用以下命令:

对于 Mac 和 Linux:bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic topic_name –from-beginning

对于 Windows: .\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic topic_name –from-beginning

8.运行应用程序并调用 API 如下:

localhost:8080/gfg/publish/{id}/{first name}/{last name}

注意:如果使用了不同的端口,则将端口替换为 8080。

输出:

  • 调用 API:
  • 实时查看消息:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/892839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络 (44)电子邮件

一、概述 电子邮件&#xff08;Electronic Mail&#xff0c;简称E-mail&#xff09;是因特网上最早流行的应用之一&#xff0c;并且至今仍然是因特网上最重要、最实用的应用之一。它利用计算机技术和互联网&#xff0c;实现了信息的快速、便捷传递。与传统的邮政系统相比&#…

代码随想录算法训练营day02| 977.有序数组的平方、209.长度最小的子数组、59.螺旋矩阵II

977. 有序数组的平方 双指针&#xff0c;新数组用k&#xff1b; 由于已经排序&#xff0c;所以比较两侧数据即可&#xff1b; class Solution { public:vector<int> sortedSquares(vector<int>& nums) {vector<int> ans(nums.size());int k nums.siz…

Unity 语音转文字 Vosk 离线库

市场有很多语音库&#xff0c;这里介绍Vosk SDK 除了支持untiy外还有原生开发服务器等 目录 安装unity示例demo下载语音训练文件运行demo结尾一键三联 注意事项 有可能debug出来的文本是空的&#xff0c;&#xff08;确保麦克风正常&#xff0c;且索引正确&#xff09;分大…

网络网络层ICMP协议

网络网络层ICMP协议 1. ICMP 协议介绍 ICMP&#xff08;Internet Control Message Protocol&#xff09;是 TCP/IP 协议簇中的网络层控制报文协议。用于在 IP 主机、路由器之间传递控制消息&#xff0c;提供可能有关通信问题的反馈信息。 以及用于网络诊断或调试&#xff08;…

Lianwei 安全周报|2025.1.13

新的一周又开始了&#xff0c;以下是本周「Lianwei周报」&#xff0c;我们总结推荐了本周的政策/标准/指南最新动态、热点资讯和安全事件&#xff0c;保证大家不错过本周的每一个重点&#xff01; 政策/标准/指南最新动态 01 美国国土安全部发布《公共部门生成式人工智能部署手…

计算机网络(五)运输层

5.1、运输层概述 概念 进程之间的通信 从通信和信息处理的角度看&#xff0c;运输层向它上面的应用层提供通信服务&#xff0c;它属于面向通信部分的最高层&#xff0c;同时也是用户功能中的最低层。 当网络的边缘部分中的两个主机使用网络的核心部分的功能进行端到端的通信时…

卷积神经02-CUDA+Pytorch环境安装

卷积神经02-CUDAPytorch环境安装 在使用Python进行pytorch的使用过程中遇到各种各样的版本冲突问题&#xff0c;在此进行记录 0-核心知识脉络 1&#xff09;根据自己电脑的CUDA版本安装对应版本的Pytorch&#xff0c;充分的使用GPU性能2&#xff09;电脑要先安装【CUDA ToolKi…

shell 脚本基本练习

一、shell 脚本写出检测 /tmp/size.log 文件如果存在显示它的内容&#xff0c;不存在则创建一个文件将创建时间写入。 1. 创建ex1.sh文件 [rootopenEuler mnt]# vim ex1.sh创建如下&#xff1a; 2. 根据题目编写脚本 n"/tmp/siz.log"if [ -f "$n" ] thenc…

CNN-GRU-MATT加入贝叶斯超参数优化,多输入单输出回归模型

CNN-GRU-MATT加入贝叶斯超参数优化&#xff0c;多输入单输出回归模型 目录 CNN-GRU-MATT加入贝叶斯超参数优化&#xff0c;多输入单输出回归模型预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现贝叶斯优化CNN-GRU融合多头注意力机制多变量回归预测&#xff…

Oracle 分区索引简介

目录 一. 什么是分区索引二. 分区索引的种类2.1 局部分区索引&#xff08;Local Partitioned Index&#xff09;2.2 全局分区索引&#xff08;Global Partitioned Index&#xff09; 三. 分区索引的创建四. 分区索引查看4.1 USER_IND_COLUMNS 表4.2 USER_INDEXES 表 五. 分区索…

​HPM6700——以太网通信lwip_udpecho_freertos_socket

1. 概述 本示例展示在FreeRTOS系统下的UDP回送通讯 PC 通过以太网发送UDP数据帧至MCU&#xff0c;MCU将接收的数据帧回发至PC 2. 硬件设置 使用USB Type-C线缆连接PC USB端口和PWR DEBUG端口 使用以太网线缆连接PC以太网端口和开发板RGMII或RMII端口 3. 工程配置 以太网端…

STC的51单片机LED点灯基于KEIL

前言&#xff1a; 该文源于回答一个朋友的问题&#xff0c;代码为该朋友上传&#xff0c;略作修改&#xff0c;在此说明问题以及解决问题的思路&#xff0c;以减少新手错误。 电路图&#xff1a; 该位朋友未上传电路图&#xff0c;说明如下&#xff1a; stc8g1k08a-sop8控制…

[leetcode]链表基础回顾

一.创建带头节点的链表 #include <iostream> #include <string> #include <algorithm> using namespace std; typedef struct Node { char ch; Node* next; }*LinkList,ListNode; void printLinkList(LinkList& head) { LinkList p head…

Cesium在vue3中的简单使用

目录 一、介绍 二、创建和基础配置 2.1 使用vite创建vue3项目 2.2 安装所需依赖 2.3 修改配置文件vite.config.js 2.4 配置路由文件 2.5 使用路由文件 三、使用cesium原本的进行渲染 3.1 渲染效果 3.2 代码实现 3.2.1 默认效果代码 3.2.2 空白地图效果代码 3.2.3 修改默…

【Uniapp-Vue3】使用defineExpose暴露子组件的属性及方法

如果我们想要让父组件访问到子组件中的变量和方法&#xff0c;就需要使用defineExpose暴露&#xff1a; defineExpose({ 变量 }) 子组件配置 父组件配置 父组件要通过onMounted获取到子组件的DOM 传递多个属性和方法 子组件 父组件

小米vela系统(基于开源nuttx内核)——openvela开源项目

前言 在 2024 年 12 月 27 日的小米「人车家全生态」合作伙伴大会上&#xff0c;小米宣布全面开源 Vela 操作系统。同时&#xff0c;OpenVela 项目正式上线 GitHub 和 Gitee&#xff0c;采用的是比较宽松的 Apache 2.0 协议&#xff0c;这意味着全球的开发者都可以参与到 Vela…

1.15寒假作业

web&#xff1a;nss靶场ez_ez_php 打开环境&#xff0c;理解代码 使用个体传参的方法&#xff0c;首先代码会检查file参数的前三个字符是不是php&#xff0c;如果是就输出nice&#xff0c;然后用include函数包含file&#xff0c;绕过不是则输出hacker&#xff0c;如果没有file…

openharmony display

https://github.com/openharmony/drivers_peripheral/blob/master/display/README_zh.md 源码路径&#xff0c;这里是对rk3588的display层适配 device/soc/rockchip/rk3588/hardware/display ├── include └── src ├── display_device &#xff08;代码量最大的部分&…

如何在linux系统上完成定时任务

任务背景 1.需要每小时更新一次github的host端口&#xff1b; 2.需要每天早上七点半准时启动电脑。 更新github的host端口 在/past/to/路径里新建一个host_update.sh文件&#xff0c;运行以下命令获得访问&#xff0c;运行和修改这个文件路径的权限&#xff1a; sudo chmod…

osg中实现模型的大小、颜色、透明度的动态变化

以博饼状模型为对象,实现了模型大小、颜色、透明度的动态变化。 需要注意的是一点: // 创建材质对象osg::ref_ptr<osg::Material> material = new osg::Material;material->setDiffuse(osg::Material::FRONT_AND_BACK, osg::Vec4(0.0, 1.0, 0.0, 0.5));// 获取模型的…