Kafka3.0.0版本——消费者(独立消费者消费某一个主题中某个分区数据案例__订阅分区)

目录

    • 一、独立消费者消费某一个主题中某个分区数据案例
      • 1.1、案例需求
      • 1.2、案例代码
      • 1.3、测试

一、独立消费者消费某一个主题中某个分区数据案例

1.1、案例需求

  • 创建一个独立消费者,消费firstTopic主题 0 号分区的数据,所下图所示:
    在这里插入图片描述

1.2、案例代码

  • 生产者往firstTopic主题 0 号分区发送数据代码

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("firstTopic", 0,"","hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
  • 消费者消费firstTopic主题 0 分区数据代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题对应的分区ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("firstTopic",0));kafkaConsumer.assign(topicPartitions);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

1.3、测试

  • 在 IDEA 中执行消费者程序,如下图:
    在这里插入图片描述
  • 在 IDEA 中执行生产者程序 ,在控制台观察生成几个 0号分区的数据,如下图:
    在这里插入图片描述
  • 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/71622.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP/IP基础

前言&#xff1a; TCP/IP协议是计算机网络领域中最基本的协议之一&#xff0c;它被广泛应用于互联网和局域网中&#xff0c;实现了不同类型、不同厂家、运行不同操作系统的计算机之间的相互通信。本文将介绍TCP/IP协议栈的层次结构、各层功能以及数据封装过程&#xff0c;帮助您…

文心一言初体验,和ChatGPT语言理解能力比较

文章目录 第一个考验&#xff0c;语义理解第二个考验&#xff0c;历史问题的回答推荐阅读 百度旗下AI大模型文心一言宣布向全社会全面开放,所有用户都可以体验这款AI大模型了。要比较这两个语言模型&#xff0c;我们先设计好题目。 第一个考验&#xff0c;语义理解 题目1&…

Linux入门之多线程|线程的同步|生产消费模型

文章目录 一、多线程的同步 1.概念 2.条件变量 2.1条件变量概念 2.2条件变量接口 1.条件变量初始化 2.等待条件满足 3.唤醒等待 3.销毁条件变量 2.3条件变量demo 二、生产消费模型 1.生产消费模型 2.基于BlockQueue的生产者消费者模型 3.基于C用条件变量和互斥锁实…

LabVIEW对EAST长脉冲等离子体运行的陀螺稳态运行控制

LabVIEW对EAST长脉冲等离子体运行的陀螺稳态运行控制 托卡马克是实现磁约束核聚变最有希望的解决方案之一。电子回旋共振加热&#xff08;ECRH是一种对托卡马克有吸引力的等离子体加热方法&#xff0c;具有耦合效率高&#xff0c;功率沉积定位好等优点。陀螺加速器是ECRH系统中…

JAVA设计模式第十讲:SPI - 业务差异解决方案

JAVA设计模式第十讲&#xff1a;SPI - 业务差异解决方案 我们需要在不修改源代码的情况下&#xff0c;动态为程序提供一系列额外的特性。首先想到的是Spring的AOP技术来构建应用插件&#xff0c;但是在Java自带的插件中&#xff0c;就有完整的实现。SPI&#xff08;Service Pro…

Win7旗舰版64位桌面创建32位IE方法

很多Win7 64位旗舰版用户系统桌面上的IE8浏览器&#xff0c;打开后都是64位的&#xff0c;而很多网站并不兼容64位的IE浏览器&#xff0c;其实在Win764位系统中IE是分为64位和32位的&#xff0c;出现这样的情况可能是桌面上的IE图标指响的是64位的IE&#xff0c;我们只要重新添…

grid弹性布局 设置宽高一致

效果图如下&#xff1a; 例子&#xff1a;设置每行四列的弹性布局&#xff0c;每个盒子宽高相同&#xff0c;间距为10px .left_list{display: grid;grid-gap: 10px 10px;grid-template-columns: repeat(4,1fr);.list_item{height: 0;padding-bottom:100%;/*高度设置为0&#…

io和进程day03(文件IO、文件属性函数、目录相关函数)

今日任务 代码 #include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/stat.h> #include <unistd.h> #include <sys/types.h> #include <pwd.h> #include <dirent.h> #in…

【图文并茂】C++介绍之串

1.1串 引子—— ​ 字符串简称为串&#xff0c;串是由字符元素构成的&#xff0c;其中元素的逻辑关系也是一种线性关系。串的处理在计算机非数值处理中占用重要的地位&#xff0c;如信息检索系统&#xff0c;文字编辑等都是以串数据作为处理对象 串是由零个或多个字符组成的…

得心应手应对 OOM 的疑难杂症

Java全能学习面试指南&#xff1a;https://www.javaxiaobear.cn/ 前面我们提到&#xff0c;类的初始化发生在类加载阶段&#xff0c;那对象都有哪些创建方式呢&#xff1f;除了我们常用的 new&#xff0c;还有下面这些方式&#xff1a; 使用 Class 的 newInstance 方法。使用…

[Vue3 博物馆管理系统] 使用Vue3、Element-plus的Layout 布局构建组图文章

系列文章目录 第一章 定制上中下&#xff08;顶部菜单、底部区域、中间主区域显示&#xff09;三层结构首页 第二章 使用Vue3、Element-plus菜单组件构建菜单 第三章 使用Vue3、Element-plus走马灯组件构建轮播图 第四章 使用Vue3、Element-plus tabs组件构建选项卡功能 第五章…

PHP反序列化漏洞

一、序列化&#xff0c;反序列化 序列化&#xff1a;将php对象压缩并按照一定格式转换成字符串过程反序列化&#xff1a;从字符串转换回php对象的过程目的&#xff1a;为了方便php对象的传输和存储 seriallize() 传入参数为php对象&#xff0c;序列化成字符串 unseriali…

go gin gorm连接postgres postgis输出geojson

go gin gorm连接postgres postgis输出geojson 1. 技术环境 go-gin-gorm postgres-postgis go vscode环境安装-智能提示配置 2. 简单实现代码 思路就是&#xff1a;采用原生sql实现查询、更新等&#xff0c;采用gorm的raw来执行sql语句 package mainimport ("fmt"&q…

JavaScript 之 Symbol 数据类型

一、简介 ​ symbol类型是ES6新引入的一种基本数据类型&#xff0c;该类型具有静态属性和静态方法。其中静态属性暴露了几个内建的成员对象&#xff0c;静态方法暴露了全局的symbol注册。 ​ symbol类型具有以下特点&#xff1a;① 唯一性&#xff1a;每个symbol值都是唯一的…

JavaScript设计模式(四)——策略模式、代理模式、观察者模式

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

【MySQL基础】事务隔离03

目录 隔离性与隔离级别事务隔离的实现事务的启动方式MySQL事务代码示例 在MySQL中&#xff0c;事务支持是在引擎层实现的。MySQL是一个支持多引擎的系统&#xff0c;但并不是所有的引擎都支持事务。比如 MySQL 原生的 MyISAM 引擎就不支持事务&#xff0c;这也是 MyISAM 被 Inn…

29 | 聊聊性能测试的基本方法与应用领域

并发用户数、响应时间、系统吞吐量之间的关系 当系统并发用户数较少时&#xff0c;系统的吞吐量也低&#xff0c;系统处于空闲状态&#xff0c;这个阶段被称为 “空闲区间”。 并发用户数进一步增长&#xff0c;系统的处理能力逐渐趋于饱和&#xff0c;因此每个用户的响应时间会…

Java 日志技术

所以说&#xff0c;要学Logback&#xff01; 配置文件 Logback提供了一个核心配置文件logback.xml&#xff0c;日志框架在记录日志时会读取配置文件中的配置信息&#xff0c;从而记录日志的形式。 可以配置日志输出的位置是文件还是控制台可以配置日志输出的格式还可以配置日…

55、基于 WebFlux 开发 WebSocKet

★ 基于Web Flux开发WebSocket 两步&#xff1a; &#xff08;1&#xff09;实现WebSocketHandler开发WebSocket处理类。 实现该接口时只需要实现Mono handle(WebSocketSession webSocketSession)方法即可。 &#xff08;2&#xff09;使用HandlerMapping和WebSocketHandler…

【Arduino27】DHT11温湿度传感器模拟值实验

硬件准备 DHT11温湿度&#xff1a;1个 面包板&#xff1a;1个 杜邦线&#xff1a;3根 硬件连线 VDD引脚接 5V 电源 DATE引脚接 4号 接口 GND引脚接 GND 接口 软件程序 #include<DHT.h>#define DHT11_pin 4 //温湿度传感器引脚DHT dht(DHT11_pin,DHT11);float tem…