Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);/*//订阅主题//创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);*///订阅分区//创建一个数组列表变量接收主题分区值ArrayList<TopicPartition> topicPartitions = new ArrayList<>();//指定要订阅的分区topicPartitions.add(new TopicPartition("customers",2));//订阅分区kafkaConsumer.assign(topicPartitions);//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/22858.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

postgresql根据某个字段去重获取整行数据

背景&#xff1a;在一些情况下我们需要对数据进行去重统计&#xff0c;group by就行&#xff0c;但是一些特殊情况下我们要根据某个字段去重之后获取非聚合字段的值&#xff0c;这个时候在mysql非严格模式下可以直接执行&#xff0c;但是在严格模式和postgresql里面是直接报错的…

k8s 对外服务之 Ingress(七层代理)

一 Ingress 简介 理论方面 1&#xff0c; k8s service 作用 对集群内部&#xff1a; 它不断跟踪pod的变化&#xff0c;更新endpoint中对应pod的对象&#xff0c;提供了ip不断变化的pod的服务发现机制 对集群外部&#xff1a; 对集群外部&#xff0c;他类似负载均衡器&am…

JAVA家政系统小程序源码,家政系统源码,支持店铺入驻接单,师傅入驻接单:专业团队自主研发的一套上门家政APP系统成品源码,支持商用

JAVA家政系统小程序源码&#xff0c;家政系统源码&#xff0c;支持店铺入驻接单&#xff0c;师傅入驻接单&#xff1a;专业团队自主研发的一套上门家政APP系统成品源码&#xff0c;支持商用 家政系统是一套可提供上门家政的系统&#xff0c;可在线预约开荒保洁、上门维修、美容…

Android 如何保证开启debug模式之后再启动

很多时候会需要debug看Android启动时候的一些数据&#xff0c;但很多时候会存在自己开启debug后app已经过了自己要debug的那段代码的时机了。 那么怎么样可以保证一定能让启动后不会错过自己要debug的那段代码执行的时机呢&#xff1f; 可以用下面这行命令&#xff0c;其中co…

废品回收小程序开发,助力商家拓展回收市场

随着互联网的快速发展&#xff0c;废品回收行业也走向了数字化发展&#xff0c;废品回收小程序成为了拓展市场的重要方式。在当下万亿元下的回收市场中&#xff0c;废品回收小程序的发展也能够发挥重要作用&#xff0c;提高市场回收效率&#xff0c;提高大众的回收意识&#xf…

前端开发入门指南:掌握网页设计的第一课

UI设计与前端开发是相辅相成&#xff0c;UI设计可以视觉美化产品界面&#xff0c;而前端开发可以通过代码实现设计稿。作为UI设计师&#xff0c;如果画出来的图片美观方便对前端开发者非常有益。如果设计复比较难以实现&#xff0c;沟通就会变得更加困难。因此&#xff0c;UI设…

Django路由与会话深度探索:静态、动态路由分发,以及Cookie与Session的奥秘

系列文章目录 Django入门全攻略&#xff1a;从零搭建你的第一个Web项目Django ORM入门指南&#xff1a;从概念到实践&#xff0c;掌握模型创建、迁移与视图操作Django ORM实战&#xff1a;模型字段与元选项配置&#xff0c;以及链式过滤与QF查询详解Django ORM深度游&#xff…

高通开发系列 - 借助libhybris库实现Linux系统中使用Andorid库

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 返回:专栏总目录 目录 概述Android代码下载和编译aarch64开发环境libhybris下载和编译libhybris测试验证调用库中的函数概述 我主要是基于…

攻防世界---web---Web_php_unserialize

1、题目描述 2、 3、分析代码 class Demo { private $file fl4g.php; }&#xff1a;定义了一个名为Demo的类&#xff0c;该类有一个私有属性$file&#xff0c;默认值为fl4g.php。 $a serialize(new Demo);&#xff1a;创建了一个Demo类的实例&#xff0c;并对其进行序列化&a…

智能监测,无忧续航!Battery Indicator for Mac,让电池状态尽在掌握

Battery Indicator for Mac 是一款设计精良的电池状态监测软件&#xff0c;它极大地增强了Mac用户对电池使用情况的感知和管理能力。 首先&#xff0c;Battery Indicator for Mac 能够实时显示电池电量百分比。这意味着&#xff0c;无论你是在处理文件、浏览网页还是观看视频&…

【栈】736. Lisp 语法解析

本文涉及知识点 栈 LeetCode736. Lisp 语法解析 给你一个类似 Lisp 语句的字符串表达式 expression&#xff0c;求出其计算结果。 表达式语法如下所示: 表达式可以为整数&#xff0c;let 表达式&#xff0c;add 表达式&#xff0c;mult 表达式&#xff0c;或赋值的变量。表达…

电影推荐系统配置运行

电影推荐系统配置运行 代码地址项目介绍&#xff08;引自原文&#xff09; 环境创建新环境激活环境安装包创建管理员用户(可选)启动 代码地址 movie 项目介绍&#xff08;引自原文&#xff09; 本推荐系统采用的是分层模型设计思想&#xff0c;第一层为前端页面模型设计&…

C++ primer例子1实现

问题 代码 Sales_item.h #include<iostream> #include<string> class Sales_item {public:Sales_item(){};Sales_item(std::string insid, int num, double price);friend std::istream& operator>>(std::istream& is, Sales_item& item);frie…

springboot 图形验证码 前后端分离解决方案 easy-captcha

easy-captcha介绍 easy-captcha&#xff0c;提供了Java图形验证码&#xff0c;支持gif、中文、算术等类型&#xff0c;可用于Java Web、JavaSE等项目&#xff0c;是个很好用的工具库&#xff0c;文档比较完备。 用法 添加maven依赖 <!--图形验证码--><dependency&g…

支付系统-业务账单

target&#xff1a;离开柬埔寨倒计时-210day 前言 最近不知道该写什么了&#xff0c;很多东西要写起来非常耗时间&#xff0c;写作是真的不容易呀 我们的支付系统账单有两大类&#xff0c;一个是业务账单还有一个就是资金记录&#xff0c;都是引发资金流后的资金变动表现&…

高效数据处理的前沿:【C++】、【Redis】、【人工智能】与【大数据】的深度整合

目录 1.为什么选择 C 和 Redis&#xff1f; 2.人工智能与大数据的背景 1.大数据的挑战 2.人工智能的需求 3.C 与 Redis 的完美结合 1.安装 Redis 和 Redis C 客户端 2.连接 Redis 并进行数据操作 高级数据操作 列表操作 哈希操作 4.与大数据和人工智能结合 5.实际应…

机器学习18个核心算法模型

1. 线性回归&#xff08;Linear Regression&#xff09; 用于建立自变量&#xff08;特征&#xff09;和因变量&#xff08;目标&#xff09;之间的线性关系。 核心公式&#xff1a; 简单线性回归的公式为&#xff1a; , 其中 是预测值&#xff0c; 是截距&#xff0c; 是斜…

【C语言】结构体(及位段)

你好&#xff01;感谢支持孔乙己的新作&#xff0c;本文就结构体与大家分析我的思路。 希望能大佬们多多纠正及支持 &#xff01;&#xff01;&#xff01; 个人主页&#xff1a;爱摸鱼的孔乙己-CSDN博客 欢迎 互粉哦&#x1f648;&#x1f648;&#xff01; 目录 1. 声明结构…

解锁财富新篇章:消费增值模式引领未来消费趋势

你是否曾对日常消费感到一丝单调&#xff0c;认为它仅仅是一种物质上的交换&#xff0c;而非财富增长的途径&#xff1f;那么&#xff0c;让我们为你打开一扇全新的消费之门——消费增值模式。这不仅是一种全新的消费体验&#xff0c;更是一种让你的资金在消费过程中不断积累与…

百度地图API 教程使用 嵌套到vue3项目中使用,能够定位并且搜索地点名称位置,反向解析获取经度和维度

文章目录 目录 文章目录 流程 小结 概要安装流程技术细节小结 概要 注册百度地图成为开发者&#xff1a; 登录百度账号 注册成功开始下一步 百度地图API是百度提供的一组开发接口&#xff0c;用于在自己的应用程序中集成地图功能。通过百度地图API&#xff0c;您可以实现地图…