Kafka生产消费实战-JAVA

Kafka生产消费实战-JAVA

文章目录

  • Kafka生产消费实战-JAVA
    • 生产者代码
    • 消费者代码
    • 消费者代码扩展
    • Consumer消费offset查询
    • Consumer消费顺序
    • Kafka的三种语义

生产者代码

public static void main(String[] args) {Properties prop = new Properties();// 指定broker地址prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");// 消息序列化prop.put("key.serializer", StringSerializer.class.getName());prop.put("value.serializer", StringSerializer.class.getName());// 创建生产者KafkaProducer producer = new KafkaProducer<String, String>(prop);// f发送数据String topic = "hello";producer.send(new ProducerRecord<String, String>(topic, "hello kafka producer"));// closeproducer.close();}

消费者代码

 public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers", "192.168.52.100:9092,192.168.52.101:9092,192.168.52.102:9092");// 反序列化prop.put("key.deserializer", StringDeserializer.class.getName());prop.put("value.deserializer", StringDeserializer.class.getName());// 指定消费者组prop.put("group.id", "con-1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);Collection<String> topics = new ArrayList<>();topics.add("hello");// 订阅指定的topicconsumer.subscribe(topics);while(true) {// 消费数据ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord message: consumerRecords) {System.out.println(message);}}}

image-20240312144608270

消费者代码扩展

// 开启自动提交功能,默认是开启prop.put("enable.auto.commit", "true");// 自动提交时间间隔prop.put("auto.commit.interval.ms", "5000");// 先根据group.id指定的消费者组查询保存的offset信息// 如果找到了,说明之前消费过该消费组的消息,则根据之前保存的offset继续消费// 如果没有找到,说明是第一次消费,或者说是之前的offset对应的数据已经不存在了,此时就会根据auto.offset.reset 的值执行不同的消费逻辑// earliest:从最早的数据开始消费,从头开始// latest : 最新的数据开始消费-默认的策略// none : 抛出异常// 在实时计算的场景下,建议设置为latest// 这个参数只会在消费者第一次消费或者对应的offset没有数据的时候才会生效prop.put("auto.offset.reset", "latest");

Consumer消费offset查询

  • kafka0.9之前,消费的offset信息是保存在zookeeper中,0.9之后使用了新的消费API,消费者的信息会保存在kafka里面的_consumer_offsets这个topic中

image-20240313091756451

  • 如何查询保存在kafka中的consumer的offset信息?
# 查询消费者信息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --list --bootstrap-server hadoop01:9092 
con-1# 消费组描述
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop01:9092 --group con-1GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
con-1           hello           2          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           3          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           1          0               0               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           0          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           4          2               2               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1

Consumer消费顺序

  • 当一个消费者消费一个partition的时候,消费的数据顺序和此partition数据的生产顺序是一致的

  • 当一个消费者消费多个partition的时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition的数据

总之,如果一个消费者消费多个partition,只能保证消费者的数据顺序在一个partition内有序

Kafka的三种语义

  • 至少一次:at-least-once,有可能对数据重复处理
// 将自动提交设置为false
prop.put("enable.auto.commit", "false");
// 手动提交
consumer.commitAsync();
  • 至多一次:at-most-once,默认实现

  • 仅此一次:exactly-once

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/740989.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt教程 — 1.1 Linux下安装Qt

目录 1 下载Qt 1.1 官方下载 1.2 百度网盘下载 1.3 Linux虚拟机终端下载 2 Qt安装 3 安装相关依赖 4 测试安装 1 下载Qt 1.1 官方下载 通过官网下载对应版本&#xff0c;本文选择的版本为qt-opensource-linux-x64-5.12.12&#xff0c;Qt官方下载链接&#xff1a;htt…

微信小程序(一)

WebView app.是全局配置&#xff0c;app.json是全局配置文件&#xff0c;在页面的.json配置文件中的配置会覆盖我们全局的配置 快捷键&#xff1a; .box 敲回车 ----- <view class"box"></view> .row*8 敲回车&#xff1a; .row{$}*8 敲回车 案例1&…

自然语言处理(NLP)—— 语义关系提取

语义关系是指名词或名词短语之间的联系。这些关系可以是表面形式&#xff08;名词性实体&#xff09;之间的联系&#xff0c;也可以是知识工程中概念之间的联系。在自然语言处理&#xff08;NLP&#xff09;和文本挖掘领域&#xff0c;识别和理解这些语义关系对于信息提取、知识…

力扣每日一题 在受污染的二叉树中查找元素 哈希 DFS 二进制

Problem: 1261. 在受污染的二叉树中查找元素 思路 &#x1f468;‍&#x1f3eb; 灵神题解 &#x1f496; 二进制 时间复杂度&#xff1a;初始化为 O ( 1 ) O(1) O(1)&#xff1b;find 为 O ( m i n ( h , l o g 2 t a r g e t ) O(min(h,log_2target) O(min(h,log2​targ…

数字孪生与智慧城市:实现城市治理现代化的新路径

随着信息技术的迅猛发展&#xff0c;智慧城市已成为城市发展的必然趋势。数字孪生技术作为智慧城市建设的重要支撑&#xff0c;以其独特的优势为城市治理现代化提供了新的路径。本文将探讨数字孪生技术在智慧城市中的应用&#xff0c;以及如何实现城市治理的现代化。 一、数字…

Python 导入Excel三维坐标数据 生成三维曲面地形图(体) 5-3、线条平滑曲面且可通过面观察柱体变化(三)

环境和包: 环境 python:python-3.12.0-amd64包: matplotlib 3.8.2 pandas 2.1.4 openpyxl 3.1.2 scipy 1.12.0 代码: import pandas as pd import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D from scipy.interpolate import griddata fro…

C#,红黑树(Red-Black Tree)的构造,插入、删除及修复、查找的算法与源代码

1 红黑树(Red-Black Tree) 如果二叉搜索树满足以下红黑属性,则它是红黑树: 每个节点不是红色就是黑色。根是黑色的。每片叶子(无)都是黑色的。如果一个节点是红色的,那么它的两个子节点都是黑色的。对于每个节点,从节点到后代叶的所有路径都包含相同数量的黑色节点。红…

机密计算:为云数据提供强大的安全性

在人工智能应用中&#xff0c;数据隐私是一个重要关注问题。在AI模型训练过程中&#xff0c;特别是在联邦学习等分布式学习场景中&#xff0c;云数据可能分布在不同的地方&#xff0c;包括用户设备、边缘服务器和云服务。机密计算是为人工智能开发中的安全和隐私保护提供基础的…

使用endnote插入引用文献导致word英文和数字变成符号的解决方案

使用endnote插入引用文献导致word英文和数字变成符号的解决方案 如图使用endnote插入引用文献导致word英文和数字变成符号字体Wingdings Wingdings 是一个符号字体系列&#xff0c;它将许多字母渲染成各式各样的符号&#xff0c;用途十分广泛。 **解决方法&#xff1a;**直接通…

Linux基础学习:常用命令

目录结构及其常用命令 处理目录的常用命令&#xff1a; ls &#xff1a;列出目录及文件名cd&#xff1a;切换目录pwd&#xff1a;显示目前的目录mkdir&#xff1a;创建一个新的目录rmdir&#xff1a;删除一个空的目录cp&#xff1a;复制文件或目录rm&#xff1a;删除文件或目录…

【3GPP】【核心网】【5G】NG接口介绍(超详细)

目录 1. NG接口定义 2. 接口原则和功能 3. NG 接口控制面 5. NG接口主要信令流程 6. NG SETUP过程 1. NG接口定义 NG接口指无线接入网与5G核心网之间的接口。在5G SA网络中&#xff0c;gNB之间通过Xn接口进行连接,gNB与5GC之间通过NG接口进行连接。NG接口分为NG-C接口和NG…

CVE-2023-38836 BoidCMSv.2.0.0 后台文件上传漏洞

漏洞简介 BoidCMS是一个免费的开源平面文件 CMS&#xff0c;用于构建简单的网站和博客&#xff0c;使用 PHP 开发并使用 JSON 作为数据库。它的安装无需配置或安装任何关系数据库&#xff08;如 MySQL&#xff09;。您只需要一个支持PHP 的Web服务器。在 BoidCMS v.2.0.0 中存…

【LLM知识】笔记

为什么现在的LLM以decoder-only为主 为什么现在的LLM都是Decoder only的架构&#xff1f; 回答一 回答二 encoder-decoder 常用于处理需要对输入和输出建立精确的映射关系的任务&#xff08;机器翻译、文本摘要等&#xff09;&#xff0c;更具有专业性优势 输入的语义理解会…

c#简易学生管理系统

https://pan.baidu.com/s/1kCPvWg8P5hvlf26nGf2vxg?pwdya45 ya45

Linux运维:磁盘分区与挂载详解

Linux运维&#xff1a;磁盘分区与挂载详解 1、磁盘分区的原理2、查看系统中所有的磁盘设备及其分区信息3、进行磁盘分区&#xff08;对于sdb新磁盘&#xff09;4、格式化分区5、挂载分区&#xff08;临时挂载、永久挂载&#xff09;6、取消挂载分区7、删除分区 &#x1f496;Th…

立式学习灯有什么讲究?大路灯原来要这样选,五大台灯分享!

立式学习灯作为近年来最适合照明的护眼家电&#xff0c;为用户提供了良好的光线环境&#xff0c;并且还能够减少光线带来的视觉疲劳感。然而&#xff0c;随着其销量的节节攀升商家为了谋取利润&#xff0c;市面上也涌现了很多劣质产品&#xff0c;这些产品普遍没有经过技术调教…

【AnaConda/MiniConda/Linux】使用sudo python或切换root管理员conda环境被绕过解决方案

写在前面 部分机型修改环境变量存在风险&#xff0c;可能用于被覆盖而出现大量命令无法找到的情况 可以输入这个解决 export PATH/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin往期相关内容 探索Miniconda3&#xff1a;简单、灵活的Python环境和…

刷题日记——16进制不进位加法(厦门大学机试)

例题 分析 输入 本题解题关键在于输入的两个数位数不同时候需要尾数对齐&#xff0c;由于是16进制输入&#xff0c;含有字母&#xff0c;需要当作字符串输入&#xff0c;当然输出也要字母&#xff0c;那么就需要我们的两个老伙计了&#xff0c;一个是map&#xff0c;另一个是…

一文带你详解天池医疗数据集

医疗作为和民生健康息息相关的产业&#xff0c;通过天池大赛开放出一批有临床科研价值的数据集&#xff0c;涵盖了预防、辅诊、医学科研等主题。 与此同时&#xff0c;阿里云天池平台也积极推动产学研的共同进步&#xff0c;开源了多个本地生活领域的数据集&#xff0c;如aBea…

数据库类型转换

数据库版本&#xff1a;KingbaseES V008R006C008B0014 简介 数据类型转换是指将一个数据类型的值转换为另一个数据类型的值的过程。数据类型转换通常发生在不同数据类型的比较、计算或赋值操作中。kingbase主要分为隐式转换和显示转换&#xff0c;本篇文章主要介绍这两种转换方…