Kafka3.1部署和Topic主题数据生产与消费

文章目录

  • 前言
  • 一、Kafka3.1X版本在Windows11主机部署
  • 二、Kafk生产Topic主题数据
    • 1.kafka生产数据
    • 2.JAVA kafka客户端消费数据
  • 总结


前言

本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:

一、Kafka3.1X版本在Windows11主机部署

1.安装JDK配置环境变量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1

3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
在这里插入图片描述
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2

3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

  .\zkServer.cmd;

在这里插入图片描述
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、Kafk生产Topic主题数据

1.kafka生产数据

创建Topic主题heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主题heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

在这里插入图片描述
Topic主题heima生产数据

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符号后输入数据:

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

在这里插入图片描述

2.JAVA kafka客户端消费数据

2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本

        <!-- kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.1</version></dependency>

2.2 JAVA数据读取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** Kafka服务器操作与数据读取*/
public class KafkaUtilDemo {public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);public static void init(String kafakservers) {// 配置Kafka消费者属性props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");}/*** 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作* @param kafaktopic* @return*/public static String poll(String kafaktopic) {String msg = "";try {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(kafaktopic));log.info("Kafka消费者订阅指定主题,持续监听并处理消息");while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));for (ConsumerRecord<String, String> record : records) {log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());msg = record.value();if (!StringUtils.isBlank(record.value())) {JSONObject jsonObject = JSONObject.parseObject(record.value());String mobilePhone = jsonObject.getString("mobilePhone");if (StringUtils.isBlank(mobilePhone)) {log.error("Kafka消费者手机号mobilePhone为空");} else {KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();kafkaUtil.syncSystemInfoTask(jsonObject);}}}}} catch (Exception e) {log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());}return msg;}public boolean syncSystemInfoTask(JSONObject jsonObject) {boolean repsBln = true;try {String mobilePhone = jsonObject.getString("mobilePhone");String roleType = jsonObject.getString("roleType");String roleCode = jsonObject.getString("roleCode");log.info("业务数据同步操作................");} catch (Exception e) {repsBln = false;log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());}return repsBln;}public static void main(String[] args) {try {String kafakservers = "localhost:9092";String kafaktopic = "heima";init(kafakservers);poll(kafaktopic);} catch (Exception e) {log.error("error msg=" + e.getMessage());}}}

3 执行KafkaUtilDemo 文件,查看消费数据。
在这里插入图片描述

总结

pom.xml文件在引入spring-kafka 会由于版本问题出现


org.apache.kafka
kafka-clients
2.0.1

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.8.RELEASE</version></dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/76573.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微服务·架构组件之服务注册与发现-Nacos

微服务组件架构之服务注册与发现之Nacos Nacos服务注册与发现流程 服务注册&#xff1a;Nacos 客户端会通过发送REST请求的方式向Nacos Server注册自己的服务&#xff0c;提供自身的元数据&#xff0c;比如ip地址、端口等信息。 Nacos Server接收到注册请求后&#xff0c;就会…

四川百幕晟科技有限公司:抖音名称最多多少字?

在抖音上&#xff0c;用户可以为其帐户选择昵称&#xff0c;该昵称显示在用户的个人资料中。不过&#xff0c;很多人好奇&#xff0c;一个抖音昵称到底能有多少个字&#xff1f;本文将深入探讨抖音昵称长度限制以及一些最吸引人的昵称示例。 1、抖音昵称长度限制 抖音昵称的长度…

虚拟机的ubuntu 22.04无法联网问题解决

问题&#xff1a;虚拟机的ubuntu 22.04无法联网 解决&#xff1a; 找到一种配置的方式&#xff0c;使用命令&#xff1a;sudo dhclient -v

Python中的包管理方法

在Python开发中&#xff0c;包管理与依赖问题一直是开发者头疼的问题之一。随着项目规模的增大和复杂性的增加&#xff0c;管理各种依赖包和解决版本冲突变得尤为重要。本文将分享解决Python中的包管理与依赖问题的方法和工具&#xff0c;帮助开发者更好地管理项目中的依赖关系…

基于python+txt的学生成绩管理系统

基于pythontxt的学生成绩管理系统 一、系统介绍二、效果展示三、其他系统实现四、获取源码 一、系统介绍 录入学生信息查找学生信息删除学生信息修改学生信息排序统计学生信息显示所有学生信息 基于python的学生成绩管理系统&#xff0c;具备基本的增删改查功能&#xff0c;包…

python经典百题之特殊图形打印

以下是几个使用Python语言打印特殊图形的示例。 打印三角形 n 5 for i in range(n):for j in range(i1):print("*", end"")print()输出&#xff1a; * ** *** **** *****打印正方形 n 5 for i in range(n):for j in range(n):print("*", e…

微信小程序——使用插槽slot快捷开发

微信小程序的插槽&#xff08;slot&#xff09;是一种组件化的技术&#xff0c;用于在父组件中插入子组件的内容。通过插槽&#xff0c;可以将父组件中的一部分内容替换为子组件的内容&#xff0c;实现更灵活的组件复用和定制。 插槽的使用步骤如下&#xff1a; 在父组件的wx…

宠物行业如何进行软文营销

如今&#xff0c;宠物已经成为了人们生活中不可或缺的一部分&#xff0c;大众对于萌宠的喜爱与日俱增&#xff0c;随着“萌宠经济”升温&#xff0c;越来越多的商机开始出现&#xff0c;伴随着宠物市场竞争的日益激烈&#xff0c;宠物行业的营销光靠硬广告很难吸引受众&#xf…

Gin学习记录1——认识与下载Gin

认识与下载Gin 前言一. 安装二. 工程三. 学习资源 前言 我为什么要学Gin&#xff1a; Python做后台可能效率不太高&#xff0c;用Go可能更合适一点Gin性能够强&#xff0c;安装方便&#xff0c;资料算比较丰富&#xff0c;用的人多&#xff0c;github ☆多更适合前后端分离开…

基于FPGA的图像二值化处理,包括tb测试文件和MATLAB辅助验证

1.算法运行效果图预览 将FPGA的数据导入到matlab进行显示 2.算法运行软件版本 Vivado2019.2 matlab2022a 3.部分核心程序 timescale 1ns / 1ps ............................................................................. module test_image;reg i_clk; reg i_rst; r…

使用亚马逊云科技人工智能内容审核服务,打造安全的图像生成和扩散模型

生成式人工智能技术发展日新月异&#xff0c;现在已经能够根据文本输入生成文本和图像。Stable Diffusion 是一种文本转图像模型&#xff0c;可让您创建栩栩如生的图像应用。您可以通过 Amazon SageMaker JumpStart&#xff0c;使用 Stable Diffusion 模型轻松地从文本生成图像…

C# Unity FSM 状态机

C# Unity FSM 状态机 使用状态机可以降低代码耦合性&#xff0c;并且可以优化代码可读性&#xff0c;方便团队协作等。 对于游戏开发内容来讲游戏开发的流程控制玩家动画都可以使用FSM有限状态机来实现。 1.FsmState 每个状态的基类&#xff0c;泛型参数表示所拥有者 publi…

轻松搭建本地知识库的ChatGLM2-6B

近期发现了一个项目&#xff0c;它的前身是ChatGLM&#xff0c;在我之前的博客中有关于ChatGLM的部署过程&#xff0c;本项目在前者基础上进行了优化&#xff0c;可以基于当前主流的LLM模型和庞大的知识库&#xff0c;实现本地部署自己的ChatGPT&#xff0c;并可结合自己的知识…

GC 算法与种类

对于垃圾收集&#xff08;GC&#xff09;, 我们需要考虑三件事情&#xff1a;哪些内存需要回收&#xff1f;如何判断是垃圾对象&#xff1f;垃圾回收算法有哪些&#xff1f; 一、GC的工作区域 1、不是GC的工作区域 (1)程序计数器、虚拟机栈和本地方法栈三个区域是线程私有的&…

python excel 读取及写入固定格式

import xlrd import xlwt import re import pandas as pd from datetime import date,datetimefile_path "C:\\Users\\function_model.xls" def readexcel():df pd.read_excel(file_path ,"配置")# e_id# id# expression# name# freq# column_data df[e…

yolov8 模型部署--TensorRT部署-c++服务化部署

写目录 yolov8 模型部署--TensorRT部署1、模型导出为onnx格式2、模型onnx格式转engine 部署 yolov8 模型部署–TensorRT部署 1、模型导出为onnx格式 如果要用TensorRT部署YOLOv8&#xff0c;需要先使用下面的命令将模型导出为onnx格式&#xff1a; yolo export modelyolov8n.p…

FPGA时序分析与约束(2)——时序电路时序

一、前言 在之前的内容中&#xff0c;我们介绍了组合电路的时序问题和可能导致的毛刺&#xff0c;强烈推荐在阅读前文的基础上再继续阅读本文&#xff0c; 前文链接&#xff1a;FPGA时序分析与约束&#xff08;1&#xff09;——组合电路时序 这篇文章中&#xff0c;我们将继续…

解决微信小程序动态生成表单情况下,遍历picker,并无法修改列表里面下标key的解决访方案

需求描述&#xff1a; 通过请求后端接口&#xff0c;动态获取表单的数据源&#xff08;json格式&#xff0c;数组&#xff09;&#xff0c;使用小程序wx:for循环遍历&#xff0c;对接口返回的数组中的一个字段存为picker组件选择器&#xff0c;并允许用户进行选择修改该值。 …

MySQL表空间

MySQL表空间 文章目录 MySQL表空间1. MySQL中的表1.1 IOT表1.2 InnoDB逻辑存储结构2. 独立表空间2.1 段 segment2.1.1 段的概念2.1.2 段的分类2.1.2.1 叶子节点段主要结构2.1.2.2 非叶子节点段2.1.3 碎片区2.2 区2.2.1 区的概念2.2.2 区的结构2.2.2.1 XDES Entry结构2.3 页2.3.…

tpwe兼容微擎小程序前端请求写法

当使用TPWe兼容微擎小程序前端时&#xff0c;可以按照以下方式编写前端请求&#xff1a; 修改siteinfo.js下面的配置文件。 var e {uniacid: "10001",acid: "10001",multiid: "0",version: "1.0",siteroot: "https://域名/inde…