kafka 快速上手

 下载 Apache Kafka

  演示window 安装

   编写启动脚本,脚本的路径根据自己实际的来

启动说明

先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper

巧记: 铲屎官(zookeeper)总是第一个到,最后一个走

启动zookeeper

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

启动kafka  

call bin/windows/kafka-server-start.bat config/server.properties

 测试脚本,主要用于创建主题 ‘test-topic’

# 创建主题(窗口1)
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic  --create# 查看主题
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe# 修改某主题的分区
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2# 生产消息(窗口2)向test-topic主题发送消息
bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
>hello kafka# 消费消息(窗口3)消费test-topic主题的消息
bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin;import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;import java.util.*;/*** User: ldj* Date: 2024/6/13* Time: 0:00* Description: 创建主题*/
public class AdminTopic {public static void main(String[] args) {Map<String, Object> adminConfigMap = new HashMap<>();adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(adminConfigMap);/*** 使用kafka默认的分区算法创建分区*/NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);NewTopic topic2 = new NewTopic("topic-02", 2, (short) 2);CreateTopicsResult addResult1 = adminClient.createTopics(Arrays.asList(topic1, topic2));/*** 手动为主题(topic-03)分配分区* topic-03主题下的0号分区有2个副本,它们中的一个在节点id=1中,一个在节点id=2中;* list里第一个副本就是leader(主写),后面都是follower(主备份)* 例如:0分区,nodeId=1的节点里的副本是主写、2分区,nodeId=3的节点里的副本是主写*/Map<Integer, List<Integer>> partition = new HashMap<>();partition.put(0, Arrays.asList(1, 2));partition.put(1, Arrays.asList(2, 3));partition.put(2, Arrays.asList(3, 1));NewTopic topic3 = new NewTopic("topic-03", partition);CreateTopicsResult addResult2 = adminClient.createTopics(Collections.singletonList(topic3));//DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));adminClient.close();}}
package com.ldj.kafka.producer;import com.alibaba.fastjson.JSON;
import com.ldj.kafka.model.UserEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;/*** User: ldj* Date: 2024/6/12* Time: 21:08* Description: 生产者*/
public class KfkProducer {public static void main(String[] args) throws Exception {//生产者配置Map<String, Object> producerConfigMap = new HashMap<>();producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 2);//消息传输应答安全级别 0-消息到达broker(效率高,但不安全)  1-消息在leader副本持久化(折中方案)  -1/all -消息在leader和flower副本都持久化(安全,但效率低)producerConfigMap.put(ProducerConfig.ACKS_CONFIG, "all");//ProducerState 缓存5条数据,重试数据会与5条数据做比较,结论只能保证一个分区的数据幂等性,跨会话幂等性需要通过事务操作解决(重启后全局消息id的随机id会发生改变)//消息发送失败重试次数,重试会导致消息重复!!(考虑幂等性),消息乱序(判断偏移量是否连续,错乱消息回到在缓冲区重新排序)!!producerConfigMap.put(ProducerConfig.RETRIES_CONFIG, 3);//kafka有消息幂等性处理(全局唯一消息id/随机id-分区-偏移量),默认false-不开启producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//解决跨会话幂等性,还需结合事务操作,忽略//producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);//TODO 事务初始化方法//producer.initTransactions();//构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)try {//TODO 开启事务//producer.beginTransaction();for (int i = 0; i < 10; i++) {UserEntity userEntity = new UserEntity().setUserId(2436687942335620L + i).setUsername("lisi").setGender(1).setAge(18);ProducerRecord<String, String> record = new ProducerRecord<>("test-topic",userEntity.getUserId().toString(),JSON.toJSONString(userEntity));//发送数据到BrokerFuture<RecordMetadata> future = producer.send(record, (RecordMetadata var1, Exception var2) -> {if (Objects.isNull(var2)) {System.out.printf("[%s]消息发送成功!", userEntity.getUserId());} else {System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());}});//TODO 提交事务//producer.commitTransaction();//注意没有下面这行代码,是异步线程从缓冲区读取数据异步发送消息,反之是同步发送,必须等待回调消息返回才会往下执行System.out.printf("发送消息[%s]----", userEntity.getUserId());RecordMetadata recordMetadata = future.get();System.out.println(recordMetadata.offset());}} finally {//TODO 终止事务//producer.abortTransaction();//关闭通道producer.close();}}}
package com.ldj.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** User: ldj* Date: 2024/6/12* Time: 21:10* Description: 消费者*/
public class KfkConsumer {public static void main(String[] args) {//消费者配置Map<String, Object> consumerConfigMap = new HashMap<>();consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//所属消费组consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);//消费主题的消息  ConsumerRebalanceListenerconsumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));//数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}} finally {//关闭消费者consumer.close();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/852320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024 Idea最新激活码

idea的激活与安装 操作如下&#xff1a; ① 打开网站&#xff1a;https://web.52shizhan.cn 切换到&#xff1a;激活码&#xff0c;点击获取 ② 这个时候就跳转到现成账号页面&#xff0c;点击获取体验号&#xff0c;如图 ③ 来到了获取现成账号的页面了。输入你的邮箱账号即…

Flutter笔记:关于WebView插件的用法(上)

Flutter笔记 关于WebView插件的用法&#xff08;上&#xff09; - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…

官宣!2024影响因子即将公布,或将迎来这些重大变化!

【SciencePub学术】IF是Impact Factor&#xff0c;即我们俗称的“影响因子”&#xff0c;是衡量学术期刊一个重要性的指标。它通过计算期刊上发表的文章在特定时间内被引用的平均次数来评估期刊的影响力。 影响因子计算公式 影响因子&#xff08;IF&#xff09;&#xff08;期…

vue3实战练习之红包雨,抢红包案例

抢红包案例 每当618消费节到来时&#xff0c;某宝、某多&#xff0c;等购物网站都会退出各种活动&#xff0c;其中抢红包&#xff0c;优惠券等红包雨活动很是火热&#xff0c;于是就通过vue的知识来做一个红包雨&#xff0c;抢红包加分活动&#xff01;代码中红包的路径改成自己…

2024年人工智能与云计算国际会议(ICAICC 2024)

2024 International Conference on Artificial Intelligence and Cloud Computing 【1】大会信息 大会时间&#xff1a;2024-07-19 大会地点&#xff1a;中国长沙 截稿时间&#xff1a;2024-07-05(以官网为准&#xff09; 审稿通知&#xff1a;投稿后2-3日内通知 会议官网&am…

看完“土猪拱白菜“的张锡峰,我明白计算机有多难了

计算机有多难&#xff1f; 今天无意中&#xff0c;看到一篇关于「"土猪拱白菜"学霸后悔报考浙大计算机」的文章。 或许会有不少和我刚开始一样懵圈的同学&#xff1a;张锡峰是谁&#xff1f;"土猪拱白菜"又是什么梗&#xff1f; 带着疑惑&#xff0c;我打开…

Tita 360评估:有效 360度反馈流程的 10 大步骤

宣传过程 如果你的公司首次引入多方位反馈或 360 度反馈&#xff0c;那么向所有利益相关者描述这一流程至关重要。由于流程太新&#xff0c;很多人还不了解。确保参与该流程的每个人都了解其目的&#xff0c;以及将如何实施该流程和使用其结果。花时间在一对一会议、小组会议和…

python的a[:2]、a[:] 和a [::] 的区别

一、a[:2] 数据准备 import numpy as np X np.array([[0,1],[2,3],[4,5],[6,7],[8,9],[10,11],[12,13],[14,15],[16,17],[18,19]]) print(X)形成矩阵 print (“X[: 2]:”, X[: 2]) ### :表示索引 0至1行&#xff1b; 二、a[:]和a [::] 在 Python 中&#xff0c;[:] 和 [::…

SQL Server 安装后,服务器再改名,造成名称不一致,查询并修改数据库服务器真实名称

SELECT SERVERNAME -- 1.查询旧服务器名称 SELECT serverproperty(servername) AS new --2.查询新服务器名称 -- 3.更新服务器名称 IF SERVERPROPERTY(servername) <> 新服务器名称替换 BEGIN DECLARE server_name NVARCHAR(128) SET server_name 新服务器…

Linux部署项目

手动部署 1.在IDEA写一个有关springboot项目 在windows客户端可以通过localhost:8080/hello 访问 2.用packge 命令将该springboot项目打包 并在target目录下找到打包的jar包 3.上传到linux上 个人习惯在usr/local/app 下上传该项目 创建切换到app目录下 mkdir /usr/local/ap…

无文件落地分离拆分-将shellcode从文本中提取-file

马子分为shellcode和执行代码. --将shellcode单独拿出,放在txt中---等待被读取执行 1-cs生成python的payload. 2-将shellcode进行base64编码 import base64code b en_code base64.b64encode(code) print(en_code) 3-将编码后的shellcode放入文件内 4-读取shellcod…

记录pytest中场景执行的token异常处理问题

前言中写了一个conftest钩子函数用于处理重复调用token的方法&#xff0c;http://t.csdnimg.cn/N4rCK&#xff0c;每个用例单独执行都很正常&#xff0c;但是批量执行时一直报错&#xff0c;token缓存处理也不生效。 所有的用例都报获取不到token&#xff0c;方法改了又改&…

C++和C语言到底有什么区别?

引言&#xff1a;C和C语言是两种非常常见的编程语言&#xff0c;由于其广泛的应用和灵活性&#xff0c;它们在计算机科学领域内受到了广泛的关注。虽然C是从C语言发展而来的&#xff0c;但是这两种语言在许多方面都有所不同。本文将对C和C语言进行比较和分析&#xff0c;以便更…

Modbus通信协议(1)--基础知识

一、基础知识 1.信息的划分 2.基本概念 3.机器数和真值 4.原码、反码与补码 5.存储单位 6.基本类型数据 7.数的浮点表示 8.各种进位制的对比 9.十进制 10.二进制 11.十六进制 12.不同进制的换算 13.位的标记 二、常用的信息编码 1.西文字符的计算机表示 2.汉字处理过程 3.汉字…

如何用ai写文案?分享方法和软件!

在当今数字化时代&#xff0c;内容创作已经成为各行各业不可或缺的一部分。然而&#xff0c;对于许多创作者来说&#xff0c;如何写出既具有吸引力又符合平台特点的文案&#xff0c;却是一项不小的挑战。幸运的是&#xff0c;人工智能&#xff08;AI&#xff09;技术的快速发展…

解决使用elmessage 没有样式的问题

错误情况 这里使用了一个消息提示&#xff0c;但是没有出现正确的样式&#xff0c; 错误原因和解决方法 出现这种情况是因为&#xff0c;在全局使用了按需导入&#xff0c;而又在局部组件中导入了ElMessage组件&#xff0c;我们只需要将局部组件的import删除就可以了 import…

uniapp 仿写弹窗

页面 <template><view click"close" class"mask"><view click.stop"onClick" class"content"><text class"text">点击蒙层关闭</text></view></view> </template><scr…

江协科技51单片机学习-0 购买套件

前言&#xff1a; 本文是根据哔哩哔哩网站上“江协科技51单片机”视频的学习笔记&#xff0c;在这里会记录下江协科技51单片机开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了江协科技51单片机教学视频和链接中的内容。 引用&#xff1a; 51单片机入门教程-2…

Blender雕刻建模_UV展开

UV展开的标准&#xff1a;展平&#xff0c;不重叠&#xff0c;均匀展开 ZenUV插件 切到边模式 -Mark&#xff0c;标记缝合边 -Unmark&#xff0c;取消标记 -Unmark All&#xff0c;全部取消标记 -Mirror Seams&#xff0c;镜像缝合边 -Zen Unwrap&#xff0c;全部展开 纹…

web端使用高德地图

web端使用高德地图 一、申请高德key和秘钥二、在项目中引入所需功能js、css文件三、实现地图选点、回显选点四、自定义地图 一、申请高德key和秘钥 申请高德key 申请成功后可以得到key 二、在项目中引入所需功能js、css文件 <script src"https://webapi.amap.com/m…