Kafka生产消费实战-JAVA

Kafka生产消费实战-JAVA

文章目录

  • Kafka生产消费实战-JAVA
    • 生产者代码
    • 消费者代码
    • 消费者代码扩展
    • Consumer消费offset查询
    • Consumer消费顺序
    • Kafka的三种语义

生产者代码

public static void main(String[] args) {Properties prop = new Properties();// 指定broker地址prop.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");// 消息序列化prop.put("key.serializer", StringSerializer.class.getName());prop.put("value.serializer", StringSerializer.class.getName());// 创建生产者KafkaProducer producer = new KafkaProducer<String, String>(prop);// f发送数据String topic = "hello";producer.send(new ProducerRecord<String, String>(topic, "hello kafka producer"));// closeproducer.close();}

消费者代码

 public static void main(String[] args) {Properties prop = new Properties();prop.put("bootstrap.servers", "192.168.52.100:9092,192.168.52.101:9092,192.168.52.102:9092");// 反序列化prop.put("key.deserializer", StringDeserializer.class.getName());prop.put("value.deserializer", StringDeserializer.class.getName());// 指定消费者组prop.put("group.id", "con-1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);Collection<String> topics = new ArrayList<>();topics.add("hello");// 订阅指定的topicconsumer.subscribe(topics);while(true) {// 消费数据ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord message: consumerRecords) {System.out.println(message);}}}

image-20240312144608270

消费者代码扩展

// 开启自动提交功能,默认是开启prop.put("enable.auto.commit", "true");// 自动提交时间间隔prop.put("auto.commit.interval.ms", "5000");// 先根据group.id指定的消费者组查询保存的offset信息// 如果找到了,说明之前消费过该消费组的消息,则根据之前保存的offset继续消费// 如果没有找到,说明是第一次消费,或者说是之前的offset对应的数据已经不存在了,此时就会根据auto.offset.reset 的值执行不同的消费逻辑// earliest:从最早的数据开始消费,从头开始// latest : 最新的数据开始消费-默认的策略// none : 抛出异常// 在实时计算的场景下,建议设置为latest// 这个参数只会在消费者第一次消费或者对应的offset没有数据的时候才会生效prop.put("auto.offset.reset", "latest");

Consumer消费offset查询

  • kafka0.9之前,消费的offset信息是保存在zookeeper中,0.9之后使用了新的消费API,消费者的信息会保存在kafka里面的_consumer_offsets这个topic中

image-20240313091756451

  • 如何查询保存在kafka中的consumer的offset信息?
# 查询消费者信息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --list --bootstrap-server hadoop01:9092 
con-1# 消费组描述
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server hadoop01:9092 --group con-1GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
con-1           hello           2          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           3          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           1          0               0               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           0          1               1               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1
con-1           hello           4          2               2               0               consumer-con-1-1-572e3210-a06e-499c-ab2e-3d3340dd0129 /192.168.52.1   consumer-con-1-1

Consumer消费顺序

  • 当一个消费者消费一个partition的时候,消费的数据顺序和此partition数据的生产顺序是一致的

  • 当一个消费者消费多个partition的时候,消费者按照partition的顺序,首先消费一个partition,当消费完一个partition最新的数据后再消费其它partition的数据

总之,如果一个消费者消费多个partition,只能保证消费者的数据顺序在一个partition内有序

Kafka的三种语义

  • 至少一次:at-least-once,有可能对数据重复处理
// 将自动提交设置为false
prop.put("enable.auto.commit", "false");
// 手动提交
consumer.commitAsync();
  • 至多一次:at-most-once,默认实现

  • 仅此一次:exactly-once

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/740989.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot总结-基本概念和快速入门

原创作者&#xff1a;田超凡&#xff08;程序员田宝宝&#xff09; 版权所有&#xff0c;转载请注明原作者&#xff0c;严禁复制转载 一、SpringBoot介绍 1.1、SpringBoot简介 SpringBoot 是一个快速开发的框架, 封装了Maven常用依赖、能够快速的整合第三方框架&#xff1b…

Qt教程 — 1.1 Linux下安装Qt

目录 1 下载Qt 1.1 官方下载 1.2 百度网盘下载 1.3 Linux虚拟机终端下载 2 Qt安装 3 安装相关依赖 4 测试安装 1 下载Qt 1.1 官方下载 通过官网下载对应版本&#xff0c;本文选择的版本为qt-opensource-linux-x64-5.12.12&#xff0c;Qt官方下载链接&#xff1a;htt…

2024济南酵素展,山东酵素食品展,中国养生健康展,发酵展

发展酵素产业&#xff0c;助力全民健康&#xff0c;第六届山东国际酵素产业展览会5月27日开幕&#xff1b; 2024第6届中国&#xff08;济南&#xff09;国际酵素产业展览会 The 2024 sixth China (Jinan) International enzyme industry Expo 举办时间&#xff1a;2024年05月…

uni-app开发介绍

uni-app是一款基于Vue.js的跨平台开发框架&#xff0c;可以一次编写&#xff0c;多端运行&#xff0c;包括iOS、Android、H5、小程序等多个平台。它将前端开发与跨平台开发结合起来&#xff0c;使开发者可以快速构建多端应用。 uni-app具有以下几个特点&#xff1a; 开发便捷&…

微信小程序(一)

WebView app.是全局配置&#xff0c;app.json是全局配置文件&#xff0c;在页面的.json配置文件中的配置会覆盖我们全局的配置 快捷键&#xff1a; .box 敲回车 ----- <view class"box"></view> .row*8 敲回车&#xff1a; .row{$}*8 敲回车 案例1&…

自然语言处理(NLP)—— 语义关系提取

语义关系是指名词或名词短语之间的联系。这些关系可以是表面形式&#xff08;名词性实体&#xff09;之间的联系&#xff0c;也可以是知识工程中概念之间的联系。在自然语言处理&#xff08;NLP&#xff09;和文本挖掘领域&#xff0c;识别和理解这些语义关系对于信息提取、知识…

linux系统docker容器编写dockerfile文件

Docker file介绍 Docker file官网构建三步骤docker file构建过程docker file内容基础知识docker执行dockerfile的大致流程 dockerfile常用保留字指令RUNEXPOSEWORKDIRUSERMAINTAINERENVADDCOPYVOLUMEFROMCMDENTRYPOINT总结 Docker file docker file是用来构建docker镜像的文本文…

openstack rocky版手动搭建

实验环境 系统&#xff1a;CentOS-7-x86_64-DVD-1804 实验环境&#xff1a;vmware 网络&#xff1a;桥接模式----1块网卡---静态IP hostnameip功能controller192.168.20.205管理节点compute192.168.20.215计算节点 &#xff08;一&#xff09;环境设置&#xff0c;所有节点…

力扣每日一题 在受污染的二叉树中查找元素 哈希 DFS 二进制

Problem: 1261. 在受污染的二叉树中查找元素 思路 &#x1f468;‍&#x1f3eb; 灵神题解 &#x1f496; 二进制 时间复杂度&#xff1a;初始化为 O ( 1 ) O(1) O(1)&#xff1b;find 为 O ( m i n ( h , l o g 2 t a r g e t ) O(min(h,log_2target) O(min(h,log2​targ…

数字孪生与智慧城市:实现城市治理现代化的新路径

随着信息技术的迅猛发展&#xff0c;智慧城市已成为城市发展的必然趋势。数字孪生技术作为智慧城市建设的重要支撑&#xff0c;以其独特的优势为城市治理现代化提供了新的路径。本文将探讨数字孪生技术在智慧城市中的应用&#xff0c;以及如何实现城市治理的现代化。 一、数字…

Python 导入Excel三维坐标数据 生成三维曲面地形图(体) 5-3、线条平滑曲面且可通过面观察柱体变化(三)

环境和包: 环境 python:python-3.12.0-amd64包: matplotlib 3.8.2 pandas 2.1.4 openpyxl 3.1.2 scipy 1.12.0 代码: import pandas as pd import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D from scipy.interpolate import griddata fro…

leetcode2864--最大二进制奇数

1. 题意 给二进制串&#xff0c;求重新排列后的最大串。 2. 题解 统计1个数&#xff0c;将 C n t − 1 Cnt-1 Cnt−1个1放开头&#xff0c;其他除第0位都是0。 class Solution { public:string maximumOddBinaryNumber(string s) {int cnt count(s.begin(), s.end(), 1);r…

C#,红黑树(Red-Black Tree)的构造,插入、删除及修复、查找的算法与源代码

1 红黑树(Red-Black Tree) 如果二叉搜索树满足以下红黑属性,则它是红黑树: 每个节点不是红色就是黑色。根是黑色的。每片叶子(无)都是黑色的。如果一个节点是红色的,那么它的两个子节点都是黑色的。对于每个节点,从节点到后代叶的所有路径都包含相同数量的黑色节点。红…

HAProxy适配openGauss使用指导书

一、HAProxy 简介 HAProxy 是一个开源的项目&#xff0c;其代码托管在 Github 上&#xff0c;代码链接如下&#xff1a;HAProxy 代码链接。HAProxy 提供高可用性、负载均衡以及基于 TCP 和 HTTP 应用的代理&#xff0c;支持虚拟主机&#xff0c;它是免费、快速并且可靠的一种解…

机密计算:为云数据提供强大的安全性

在人工智能应用中&#xff0c;数据隐私是一个重要关注问题。在AI模型训练过程中&#xff0c;特别是在联邦学习等分布式学习场景中&#xff0c;云数据可能分布在不同的地方&#xff0c;包括用户设备、边缘服务器和云服务。机密计算是为人工智能开发中的安全和隐私保护提供基础的…

[Vue] 自定义命令

Vue 自定义命令 官方的vue命令包括&#xff1a;v-html v-if v-else v-show v-for等等为了提高效率&#xff0c;减少重复项编写&#xff0c;vue支持自定义命令&#xff0c;可以封装一些DOM操作&#xff0c;扩展额外的功能 语法 注册 全局注册 //在main.js中 Vue.directive(…

使用endnote插入引用文献导致word英文和数字变成符号的解决方案

使用endnote插入引用文献导致word英文和数字变成符号的解决方案 如图使用endnote插入引用文献导致word英文和数字变成符号字体Wingdings Wingdings 是一个符号字体系列&#xff0c;它将许多字母渲染成各式各样的符号&#xff0c;用途十分广泛。 **解决方法&#xff1a;**直接通…

Linux基础学习:常用命令

目录结构及其常用命令 处理目录的常用命令&#xff1a; ls &#xff1a;列出目录及文件名cd&#xff1a;切换目录pwd&#xff1a;显示目前的目录mkdir&#xff1a;创建一个新的目录rmdir&#xff1a;删除一个空的目录cp&#xff1a;复制文件或目录rm&#xff1a;删除文件或目录…

uni-app网络请求封装及发送

在看本篇文章之前&#xff0c;请先至少学会独立完成vue2项目 首先配置request.js const url_all {DEV: http://localhost:8888, // 开发// 生产 PRO: http://111.111.111.111:8080, }let BASEURL url_all[DEV] // 调整当前环境/** 全局请求封装* param path 请求路径* pa…

ROS2 Python导入三方库及自定义python程序

ROS2 Python导入三方库及自定义python程序 文章目录 前言正文导入三方库文件导入自定义python程序 前言 本文主要讲解ROS2建立pyhon包时如何导入三方库文件&#xff08;.so&#xff09;&#xff0c;及自定义的python程序。 正文 导入三方库文件 此处以Jaka驱动为例&#xf…