kafka 快速上手

 下载 Apache Kafka

  演示window 安装

   编写启动脚本,脚本的路径根据自己实际的来

启动说明

先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper

巧记: 铲屎官(zookeeper)总是第一个到,最后一个走

启动zookeeper

call bin/windows/zookeeper-server-start.bat config/zookeeper.properties

启动kafka  

call bin/windows/kafka-server-start.bat config/server.properties

 测试脚本,主要用于创建主题 ‘test-topic’

# 创建主题(窗口1)
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic  --create# 查看主题
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe# 修改某主题的分区
bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2# 生产消息(窗口2)向test-topic主题发送消息
bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic
>hello kafka# 消费消息(窗口3)消费test-topic主题的消息
bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin;import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;import java.util.*;/*** User: ldj* Date: 2024/6/13* Time: 0:00* Description: 创建主题*/
public class AdminTopic {public static void main(String[] args) {Map<String, Object> adminConfigMap = new HashMap<>();adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");AdminClient adminClient = AdminClient.create(adminConfigMap);/*** 使用kafka默认的分区算法创建分区*/NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1);NewTopic topic2 = new NewTopic("topic-02", 2, (short) 2);CreateTopicsResult addResult1 = adminClient.createTopics(Arrays.asList(topic1, topic2));/*** 手动为主题(topic-03)分配分区* topic-03主题下的0号分区有2个副本,它们中的一个在节点id=1中,一个在节点id=2中;* list里第一个副本就是leader(主写),后面都是follower(主备份)* 例如:0分区,nodeId=1的节点里的副本是主写、2分区,nodeId=3的节点里的副本是主写*/Map<Integer, List<Integer>> partition = new HashMap<>();partition.put(0, Arrays.asList(1, 2));partition.put(1, Arrays.asList(2, 3));partition.put(2, Arrays.asList(3, 1));NewTopic topic3 = new NewTopic("topic-03", partition);CreateTopicsResult addResult2 = adminClient.createTopics(Collections.singletonList(topic3));//DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02"));adminClient.close();}}
package com.ldj.kafka.producer;import com.alibaba.fastjson.JSON;
import com.ldj.kafka.model.UserEntity;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;/*** User: ldj* Date: 2024/6/12* Time: 21:08* Description: 生产者*/
public class KfkProducer {public static void main(String[] args) throws Exception {//生产者配置Map<String, Object> producerConfigMap = new HashMap<>();producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);//批量发送producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 2);//消息传输应答安全级别 0-消息到达broker(效率高,但不安全)  1-消息在leader副本持久化(折中方案)  -1/all -消息在leader和flower副本都持久化(安全,但效率低)producerConfigMap.put(ProducerConfig.ACKS_CONFIG, "all");//ProducerState 缓存5条数据,重试数据会与5条数据做比较,结论只能保证一个分区的数据幂等性,跨会话幂等性需要通过事务操作解决(重启后全局消息id的随机id会发生改变)//消息发送失败重试次数,重试会导致消息重复!!(考虑幂等性),消息乱序(判断偏移量是否连续,错乱消息回到在缓冲区重新排序)!!producerConfigMap.put(ProducerConfig.RETRIES_CONFIG, 3);//kafka有消息幂等性处理(全局唯一消息id/随机id-分区-偏移量),默认false-不开启producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//解决跨会话幂等性,还需结合事务操作,忽略//producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id");//创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfigMap);//TODO 事务初始化方法//producer.initTransactions();//构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)try {//TODO 开启事务//producer.beginTransaction();for (int i = 0; i < 10; i++) {UserEntity userEntity = new UserEntity().setUserId(2436687942335620L + i).setUsername("lisi").setGender(1).setAge(18);ProducerRecord<String, String> record = new ProducerRecord<>("test-topic",userEntity.getUserId().toString(),JSON.toJSONString(userEntity));//发送数据到BrokerFuture<RecordMetadata> future = producer.send(record, (RecordMetadata var1, Exception var2) -> {if (Objects.isNull(var2)) {System.out.printf("[%s]消息发送成功!", userEntity.getUserId());} else {System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause());}});//TODO 提交事务//producer.commitTransaction();//注意没有下面这行代码,是异步线程从缓冲区读取数据异步发送消息,反之是同步发送,必须等待回调消息返回才会往下执行System.out.printf("发送消息[%s]----", userEntity.getUserId());RecordMetadata recordMetadata = future.get();System.out.println(recordMetadata.offset());}} finally {//TODO 终止事务//producer.abortTransaction();//关闭通道producer.close();}}}
package com.ldj.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;/*** User: ldj* Date: 2024/6/12* Time: 21:10* Description: 消费者*/
public class KfkConsumer {public static void main(String[] args) {//消费者配置Map<String, Object> consumerConfigMap = new HashMap<>();consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//所属消费组consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfigMap);//消费主题的消息  ConsumerRebalanceListenerconsumer.subscribe(Collections.singletonList("test-topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));//数据存储结构:Map<TopicPartition, List<ConsumerRecord<K, V>>> records;for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}} finally {//关闭消费者consumer.close();}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/852320.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024 Idea最新激活码

idea的激活与安装 操作如下&#xff1a; ① 打开网站&#xff1a;https://web.52shizhan.cn 切换到&#xff1a;激活码&#xff0c;点击获取 ② 这个时候就跳转到现成账号页面&#xff0c;点击获取体验号&#xff0c;如图 ③ 来到了获取现成账号的页面了。输入你的邮箱账号即…

Flutter笔记:关于WebView插件的用法(上)

Flutter笔记 关于WebView插件的用法&#xff08;上&#xff09; - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite&#xff1a;http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:htt…

PaddleDetection快速体验quick_start

1 快速体验 # 设置显卡 export CUDA_VISIBLE_DEVICES0# 用PP-YOLO算法在COCO数据集上预训练模型预测一张图片 python tools/infer.py -c configs/ppyolo/ppyolo_r50vd_dcn_1x_coco.yml -o use_gputrue weightshttps://paddledet.bj.bcebos.com/models/ppyolo_r50vd_dcn_1x_coc…

针对oracle系列数据库慢数据量大的问题

-- 确保索引存在 create index idx_risk_assessment_hazard on risk_assessment_hazard(data_time, perception_id); create index idx_risk_dispose_base_info on risk_dispose_base_info(perception_id); -- 使用并行查询和with子句进行优化 explain plan for with t2 as (se…

官宣!2024影响因子即将公布,或将迎来这些重大变化!

【SciencePub学术】IF是Impact Factor&#xff0c;即我们俗称的“影响因子”&#xff0c;是衡量学术期刊一个重要性的指标。它通过计算期刊上发表的文章在特定时间内被引用的平均次数来评估期刊的影响力。 影响因子计算公式 影响因子&#xff08;IF&#xff09;&#xff08;期…

vue3实战练习之红包雨,抢红包案例

抢红包案例 每当618消费节到来时&#xff0c;某宝、某多&#xff0c;等购物网站都会退出各种活动&#xff0c;其中抢红包&#xff0c;优惠券等红包雨活动很是火热&#xff0c;于是就通过vue的知识来做一个红包雨&#xff0c;抢红包加分活动&#xff01;代码中红包的路径改成自己…

2024年人工智能与云计算国际会议(ICAICC 2024)

2024 International Conference on Artificial Intelligence and Cloud Computing 【1】大会信息 大会时间&#xff1a;2024-07-19 大会地点&#xff1a;中国长沙 截稿时间&#xff1a;2024-07-05(以官网为准&#xff09; 审稿通知&#xff1a;投稿后2-3日内通知 会议官网&am…

看完“土猪拱白菜“的张锡峰,我明白计算机有多难了

计算机有多难&#xff1f; 今天无意中&#xff0c;看到一篇关于「"土猪拱白菜"学霸后悔报考浙大计算机」的文章。 或许会有不少和我刚开始一样懵圈的同学&#xff1a;张锡峰是谁&#xff1f;"土猪拱白菜"又是什么梗&#xff1f; 带着疑惑&#xff0c;我打开…

ViewModel、Lifecycles、LiveData基本使用

以下是使用Java实现ViewModel、Lifecycles和LiveData的基本用法&#xff0c;以及它们的原理简述。 ViewModel的基本使用&#xff08;Java&#xff09; 1. 引入依赖 在你的build.gradle文件中添加以下依赖&#xff1a; implementation androidx.lifecycle:lifecycle-viewmod…

MySQL查询性能优化解决方案

解决方案 主键与默认常用查询字段建立索引&#xff0c;普通字段类型选择 UNIQUE&#xff0c;索引方法 BTREE &#xff1b;长文本使用 FULLTEXT&#xff0c;索引方法为无&#xff1b; 新建表时引擎默认设置为 MyISAM&#xff0c;不使用 InnoDB&#xff0c;因为 MyISAM 支持 MAT…

Tita 360评估:有效 360度反馈流程的 10 大步骤

宣传过程 如果你的公司首次引入多方位反馈或 360 度反馈&#xff0c;那么向所有利益相关者描述这一流程至关重要。由于流程太新&#xff0c;很多人还不了解。确保参与该流程的每个人都了解其目的&#xff0c;以及将如何实施该流程和使用其结果。花时间在一对一会议、小组会议和…

拆分shp数据要素为多个shp

我有一个各县shp文件&#xff0c;我想要拆分成不同的要素&#xff0c;命名根据NAME字段进行命名&#xff0c;字段值是字符串&#xff0c;以下为基于arcpy的python实现代码。&#xff08;python2.7&#xff09; import arcpy import os# 设置工作空间和输入的 shapefile 文件路…

python的a[:2]、a[:] 和a [::] 的区别

一、a[:2] 数据准备 import numpy as np X np.array([[0,1],[2,3],[4,5],[6,7],[8,9],[10,11],[12,13],[14,15],[16,17],[18,19]]) print(X)形成矩阵 print (“X[: 2]:”, X[: 2]) ### :表示索引 0至1行&#xff1b; 二、a[:]和a [::] 在 Python 中&#xff0c;[:] 和 [::…

SQL Server 安装后,服务器再改名,造成名称不一致,查询并修改数据库服务器真实名称

SELECT SERVERNAME -- 1.查询旧服务器名称 SELECT serverproperty(servername) AS new --2.查询新服务器名称 -- 3.更新服务器名称 IF SERVERPROPERTY(servername) <> 新服务器名称替换 BEGIN DECLARE server_name NVARCHAR(128) SET server_name 新服务器…

Webrtc支持FFMPEG硬解码之Intel(一)

前言 此系列文章分分为三篇, Webrtc支持FFMPEG硬解码之Intel(一)-CSDN博客 Webrtc支持FFMPEG硬解码之NVIDA(二)-CSDN博客 Webrtc支持FFMPEG硬解码之解码实现(三)-CSDN博客 AMD硬解目前还没找到可用解码器,欢迎留言交流 环境 Windows平台 VS20

Linux部署项目

手动部署 1.在IDEA写一个有关springboot项目 在windows客户端可以通过localhost:8080/hello 访问 2.用packge 命令将该springboot项目打包 并在target目录下找到打包的jar包 3.上传到linux上 个人习惯在usr/local/app 下上传该项目 创建切换到app目录下 mkdir /usr/local/ap…

无文件落地分离拆分-将shellcode从文本中提取-file

马子分为shellcode和执行代码. --将shellcode单独拿出,放在txt中---等待被读取执行 1-cs生成python的payload. 2-将shellcode进行base64编码 import base64code b en_code base64.b64encode(code) print(en_code) 3-将编码后的shellcode放入文件内 4-读取shellcod…

记录pytest中场景执行的token异常处理问题

前言中写了一个conftest钩子函数用于处理重复调用token的方法&#xff0c;http://t.csdnimg.cn/N4rCK&#xff0c;每个用例单独执行都很正常&#xff0c;但是批量执行时一直报错&#xff0c;token缓存处理也不生效。 所有的用例都报获取不到token&#xff0c;方法改了又改&…

算法第9章 图算法设计

7-1 旅游规划 #include <iostream> #include <algorithm> #include <cstring> using namespace std; typedef pair<int,int> PII; const int N 510; int g[N][N]; int n,m,s,d; int dist[N]; int w[N],t[N][N]; bool st[N]; void dijkstra(){ memset…

C++和C语言到底有什么区别?

引言&#xff1a;C和C语言是两种非常常见的编程语言&#xff0c;由于其广泛的应用和灵活性&#xff0c;它们在计算机科学领域内受到了广泛的关注。虽然C是从C语言发展而来的&#xff0c;但是这两种语言在许多方面都有所不同。本文将对C和C语言进行比较和分析&#xff0c;以便更…