文章目录
- 前言
- 一、Kafka3.1X版本在Windows11主机部署
- 二、Kafk生产Topic主题数据
- 1.kafka生产数据
- 2.JAVA kafka客户端消费数据
- 总结
前言
本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:
一、Kafka3.1X版本在Windows11主机部署
1.安装JDK配置环境变量
2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1
3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2
3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs
4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行
.\zkServer.cmd;
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行
.\bin\windows\kafka-server-start.bat .\config\server.properties
二、Kafk生产Topic主题数据
1.kafka生产数据
创建Topic主题heima
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.
查看Topic主题heima
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic heima
Topic主题heima生产数据
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima
在 > 符号后输入数据:
{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}
2.JAVA kafka客户端消费数据
2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本
<!-- kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.1</version></dependency>
2.2 JAVA数据读取文件
package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** Kafka服务器操作与数据读取*/
public class KafkaUtilDemo {public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);public static final Properties props = new Properties();
// protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);public static void init(String kafakservers) {// 配置Kafka消费者属性props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");}/*** 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作* @param kafaktopic* @return*/public static String poll(String kafaktopic) {String msg = "";try {KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(kafaktopic));log.info("Kafka消费者订阅指定主题,持续监听并处理消息");while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));for (ConsumerRecord<String, String> record : records) {log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());msg = record.value();if (!StringUtils.isBlank(record.value())) {JSONObject jsonObject = JSONObject.parseObject(record.value());String mobilePhone = jsonObject.getString("mobilePhone");if (StringUtils.isBlank(mobilePhone)) {log.error("Kafka消费者手机号mobilePhone为空");} else {KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();kafkaUtil.syncSystemInfoTask(jsonObject);}}}}} catch (Exception e) {log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());}return msg;}public boolean syncSystemInfoTask(JSONObject jsonObject) {boolean repsBln = true;try {String mobilePhone = jsonObject.getString("mobilePhone");String roleType = jsonObject.getString("roleType");String roleCode = jsonObject.getString("roleCode");log.info("业务数据同步操作................");} catch (Exception e) {repsBln = false;log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());}return repsBln;}public static void main(String[] args) {try {String kafakservers = "localhost:9092";String kafaktopic = "heima";init(kafakservers);poll(kafaktopic);} catch (Exception e) {log.error("error msg=" + e.getMessage());}}}
3 执行KafkaUtilDemo 文件,查看消费数据。
总结
pom.xml文件在引入spring-kafka 会由于版本问题出现
org.apache.kafka
kafka-clients
2.0.1
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.8.RELEASE</version></dependency>