Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

目录

    • 一、独立消费者消费某一个主题中某个分区数据案例
      • 1.1、案例需求
      • 1.2、案例代码
      • 1.3、测试

一、独立消费者消费某一个主题中某个分区数据案例

1.1、案例需求

  • 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示:
    在这里插入图片描述

1.2、案例代码

  • 生产者往firstTopic主题 0 号分区发送数据代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
  • 消费者消费firstTopic主题 0 分区数据代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题对应的分区ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("firstTopic",0));kafkaConsumer.assign(topicPartitions);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

1.3、测试

  • 在 IDEA 中执行消费者程序,如下图:
    在这里插入图片描述
  • 在 IDEA 中执行生产者程序 ,在控制台观察生成几个 0号分区的数据,如下图:
    在这里插入图片描述
  • 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71622.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Halcon实现3维点云平面拟合

Halcon实现3维点云平面拟合 function main()WindowHandle open_window()ObjectModel3D load_3D_model("1.om3")ObjectModel3DSelected remove_noise(ObjectModel3D)[X, Y, Z] extract_coordinates(ObjectModel3DSelected)[NX, NY, NZ, C] fit_plane(X, Y, Z)vi…

TCP/IP基础

前言&#xff1a; TCP/IP协议是计算机网络领域中最基本的协议之一&#xff0c;它被广泛应用于互联网和局域网中&#xff0c;实现了不同类型、不同厂家、运行不同操作系统的计算机之间的相互通信。本文将介绍TCP/IP协议栈的层次结构、各层功能以及数据封装过程&#xff0c;帮助您…

js去除字符串空格的几种方式

方法1&#xff1a;(最常用)全部去除掉空格 var str abc d e f g ; function trim(str) { var reg /[\t\r\f\n\s]*/g; if (typeof str string) { var trimStr str.replace(reg,); } console.lo…

文心一言初体验,和ChatGPT语言理解能力比较

文章目录 第一个考验&#xff0c;语义理解第二个考验&#xff0c;历史问题的回答推荐阅读 百度旗下AI大模型文心一言宣布向全社会全面开放,所有用户都可以体验这款AI大模型了。要比较这两个语言模型&#xff0c;我们先设计好题目。 第一个考验&#xff0c;语义理解 题目1&…

Linux入门之多线程|线程的同步|生产消费模型

文章目录 一、多线程的同步 1.概念 2.条件变量 2.1条件变量概念 2.2条件变量接口 1.条件变量初始化 2.等待条件满足 3.唤醒等待 3.销毁条件变量 2.3条件变量demo 二、生产消费模型 1.生产消费模型 2.基于BlockQueue的生产者消费者模型 3.基于C用条件变量和互斥锁实…

C++学习笔记

C 1、函数重载2、类2.1、类的方法和属性2.2、类的方法的定义2.3、构造器和析构器2.4、基类与子类2.5、类的public、protected、private继承2.6、类的方法的重载2.7、子类方法的覆盖 1、函数重载 函数重载大概可以理解为&#xff0c;定义两个名字一模一样&#xff0c;但形参不一…

STM32F103ZE单片机呼吸灯源代码

//这个是中断文件/* USER CODE BEGIN Header *//********************************************************************************* file stm32f1xx_it.c* brief Interrupt Service Routines.**********************************************************************…

LabVIEW对EAST长脉冲等离子体运行的陀螺稳态运行控制

LabVIEW对EAST长脉冲等离子体运行的陀螺稳态运行控制 托卡马克是实现磁约束核聚变最有希望的解决方案之一。电子回旋共振加热&#xff08;ECRH是一种对托卡马克有吸引力的等离子体加热方法&#xff0c;具有耦合效率高&#xff0c;功率沉积定位好等优点。陀螺加速器是ECRH系统中…

JAVA设计模式第十讲:SPI - 业务差异解决方案

JAVA设计模式第十讲&#xff1a;SPI - 业务差异解决方案 我们需要在不修改源代码的情况下&#xff0c;动态为程序提供一系列额外的特性。首先想到的是Spring的AOP技术来构建应用插件&#xff0c;但是在Java自带的插件中&#xff0c;就有完整的实现。SPI&#xff08;Service Pro…

Win7旗舰版64位桌面创建32位IE方法

很多Win7 64位旗舰版用户系统桌面上的IE8浏览器&#xff0c;打开后都是64位的&#xff0c;而很多网站并不兼容64位的IE浏览器&#xff0c;其实在Win764位系统中IE是分为64位和32位的&#xff0c;出现这样的情况可能是桌面上的IE图标指响的是64位的IE&#xff0c;我们只要重新添…

grid弹性布局 设置宽高一致

效果图如下&#xff1a; 例子&#xff1a;设置每行四列的弹性布局&#xff0c;每个盒子宽高相同&#xff0c;间距为10px .left_list{display: grid;grid-gap: 10px 10px;grid-template-columns: repeat(4,1fr);.list_item{height: 0;padding-bottom:100%;/*高度设置为0&#…

io和进程day03(文件IO、文件属性函数、目录相关函数)

今日任务 代码 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> #include <sys/types.h> #include <pwd.h> #include <dirent.h> #in…

【图文并茂】C++介绍之串

1.1串 引子—— ​ 字符串简称为串&#xff0c;串是由字符元素构成的&#xff0c;其中元素的逻辑关系也是一种线性关系。串的处理在计算机非数值处理中占用重要的地位&#xff0c;如信息检索系统&#xff0c;文字编辑等都是以串数据作为处理对象 串是由零个或多个字符组成的…

得心应手应对 OOM 的疑难杂症

Java全能学习面试指南&#xff1a;https://www.javaxiaobear.cn/ 前面我们提到&#xff0c;类的初始化发生在类加载阶段&#xff0c;那对象都有哪些创建方式呢&#xff1f;除了我们常用的 new&#xff0c;还有下面这些方式&#xff1a; 使用 Class 的 newInstance 方法。使用…

[Vue3 博物馆管理系统] 使用Vue3、Element-plus的Layout 布局构建组图文章

系列文章目录 第一章 定制上中下&#xff08;顶部菜单、底部区域、中间主区域显示&#xff09;三层结构首页 第二章 使用Vue3、Element-plus菜单组件构建菜单 第三章 使用Vue3、Element-plus走马灯组件构建轮播图 第四章 使用Vue3、Element-plus tabs组件构建选项卡功能 第五章…

PHP反序列化漏洞

一、序列化&#xff0c;反序列化 序列化&#xff1a;将php对象压缩并按照一定格式转换成字符串过程反序列化&#xff1a;从字符串转换回php对象的过程目的&#xff1a;为了方便php对象的传输和存储 seriallize() 传入参数为php对象&#xff0c;序列化成字符串 unseriali…

go gin gorm连接postgres postgis输出geojson

go gin gorm连接postgres postgis输出geojson 1. 技术环境 go-gin-gorm postgres-postgis go vscode环境安装-智能提示配置 2. 简单实现代码 思路就是&#xff1a;采用原生sql实现查询、更新等&#xff0c;采用gorm的raw来执行sql语句 package mainimport ("fmt"&q…

nginx解决静态页面中ajax跨域方案配置

nginx 配置内容 &#xff1a; server { listen 9999;#监听端口 server_name localhost;#域名 add_header Access-Control-Allow-Origin *; # 允许跨域 #charset koi8-r; #access_log logs/host.access.log main; location / { …

python面向对象的一个简单实例

#发文福利# #!/usr/bin/env python # -*- coding:utf-8 -*-students {id001: {name: serena, age: 18, address: beijing},id002: {name: fanbingbing, age: 42, address: anhui},id003: {name: kahn, age: 20, address: shanghai}}class Student:def __init__(self, xid, na…

采访 Footprint Analytics CEO Navy:AI 与 Web3 的融合之道

Web3 正在引领互联网的下一个时代。然而&#xff0c;链上数据碎片化、不标准化的问题依然存在。Footprint Analytics 推出一站式数据解决方案&#xff0c;通过 AI 技术实现区块链数据的自动收集、清洗、关联&#xff0c;构建跨链数据标准&#xff0c;让开发者更便捷地访问和分析…