9.Kafka消费者API实践

目录

  • 概述
  • 实践
    • topic
    • 消费者
    • 效果
  • 消费指定topic的某个分区
    • 代码
    • 效果
    • kafka分区策略-Range

概述

  Kafka消费者API实践

实践

topic

# ./kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic test03
[root@hadoop02 bin]# ./kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic test03
Created topic test03.

消费者

public class KafkaConsumerApp {public static final String BROKERS = "hadoop02:9092";public static final String TOPIC = "test03";public static final String GROUP = "test-group";private static KafkaConsumer<String,String> kafkaConsumer;/*** 资源初始化*/@Beforepublic void setUp() {Properties props = new Properties();// 连接至kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);// 反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);// 创建一个消费者对象kafkaConsumer = new KafkaConsumer<>(props);}@Testpublic void test() {List<String> topics = new ArrayList<>();topics.add(TOPIC);kafkaConsumer.subscribe(topics);// 消费 kafka 中的 topic 的数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {String key = record.key();String value = record.value();String topic = record.topic();int partition = record.partition();long offset = record.offset();String result= "key:"+key+" ,value:"+value+" ,topic:"+topic+" ,partition:"+partition+" ,offset:"+offset;System.out.println(result);}}}/*** 资源释放*/@Afterpublic void close() {if (null != kafkaConsumer) {kafkaConsumer.close();}}}

效果

在这里插入图片描述

消费指定topic的某个分区

代码

public class KafkaConsumerApp {public static final String BROKERS = "hadoop02:9092";public static final String TOPIC = "test03";public static final String GROUP = "test-group";private static KafkaConsumer<String,String> kafkaConsumer;/*** 资源初始化*/@Beforepublic void setUp() {Properties props = new Properties();// 连接至kafka集群props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);// 反序列化props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP);// 创建一个消费者对象kafkaConsumer = new KafkaConsumer<>(props);}/*** 消费指定分区的数据 (此案例消费 TOPIC 分区0 中的数据)*/@Testpublic void test2() {List<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition(TOPIC, 0));kafkaConsumer.assign(topicPartitions);// 消费 kafka 中的 topic 的数据while (true) {ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : consumerRecords) {String key = record.key();String value = record.value();String topic = record.topic();int partition = record.partition();long offset = record.offset();String result= "key:"+key+" ,value:"+value+" ,topic:"+topic+" ,partition:"+partition+" ,offset:"+offset;System.out.println(result);}}}/*** 资源释放*/@Afterpublic void close() {if (null != kafkaConsumer) {kafkaConsumer.close();}}}

效果

在这里插入图片描述
在这里插入图片描述

kafka分区策略-Range

// 设置分区器
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());

在这里插入图片描述

可以自定义:org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/47420.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【问题解决】Jetson nano 安装pytorch使用GPU推理

一. 问题描述 安装 yolov8 后只调用cpu推理图片 二. 解决步骤 2.1 在推理环境下&#xff0c;执行下面命令卸载pytorch pip uninstall torch torchtext torchaudio2.2 下载PyTorch的依赖: sudo apt-get -y update; sudo apt-get -y install libopenblas-dev;###2.3 下载py…

深入全面概括C语言的运算符

目录 二.算术运算符 三.自增自减运算符 四.赋值运算符 五.关系运算符 六.逻辑运算符 七.三元运算符 九.运算符的优先级 一.前言 c语言的运算符可以分为六种&#xff0c;分别是&#xff1a;1.算术运算符&#xff1b;2.自增自减运算符&#xff1b;3.赋值运算符&#xff1b…

uniapp转小程序,小程序转uniapp方法

&#x1f935; 作者&#xff1a;coderYYY &#x1f9d1; 个人简介&#xff1a;前端程序媛&#xff0c;目前主攻web前端&#xff0c;后端辅助&#xff0c;其他技术知识也会偶尔分享&#x1f340;欢迎和我一起交流&#xff01;&#x1f680;&#xff08;评论和私信一般会回&#…

python-字符金字塔(赛氪OJ)

[题目描述] 请打印输出一个字符金字塔&#xff0c;字符金字塔的特征请参考样例。输入格式&#xff1a; 输入一个字母&#xff0c;保证是大写。输出格式&#xff1a; 输出一个字母金字塔&#xff0c;输出样式见样例。样例输入 C样例输出 A ABA …

【ffmpeg命令基础】过滤处理

文章目录 前言过滤处理的介绍两种过滤类型简单滤波图简单滤波图是什么简单滤波示例 复杂滤波图复杂滤波是什么区别示例 总结 前言 FFmpeg是一款功能强大的开源音视频处理工具&#xff0c;广泛应用于音视频的采集、编解码、转码、流化、过滤和播放等领域。1本文将重点介绍FFmpe…

Docker入门:从安装到实际应用

Docker入门指南&#xff1a;从安装到实际应用 Docker 是一个开源的平台&#xff0c;允许开发者通过容器技术来部署、管理和运行应用程序。容器是一种轻量级、独立的运行环境&#xff0c;可以包含应用程序及其所有依赖项&#xff0c;从而确保在不同环境下运行一致。本文将介绍 …

Python、Rust与AI的未来展望

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 非常期待和您一起在这个小…

【关于PHP性能优化,内存优化,日志工具等问题处理】

目录 PHP 性能优化&#xff1a; 如何优化 PHP 代码以提高性能&#xff1f; 通用优化策略&#xff1a; 框架特定优化&#xff1a; 性能优化最佳实践&#xff1a; 描述一下你使用过的 PHP 性能分析工具。 检测内存泄漏的方法 使用工具检测内存泄漏 常见内存泄漏场景及解决…

FastAdmin: 一款基于ThinkPHP+Bootstrap的极速后台开发框架(Gitee最有价值开源项目)

欢迎加入我们前端技术学习交流群&#xff0c;关注“前端组件开发”公众号&#xff0c;私信可申请入群 摘要&#xff1a; 随着Web技术的快速发展&#xff0c;后台管理系统的开发效率与灵活性成为了项目成功的关键。FastAdmin作为一款基于ThinkPHP和Bootstrap的开源后台框架&…

基于 Vue 3 和 Element Plus 构建图书管理系统

基于 Vue 3 和 Element Plus 构建图书管理系统 本文将介绍如何使用 Vue 3 和 Element Plus 构建一个简单的图书管理系统。这个系统将包括以下功能&#xff1a; 添加新书显示图书列表分页显示图书删除图书 相关链接 接口地址 elementplus中文地址 项目结构 我们的项目结构…

Langchain[4]:Langchain 0.2革命性突破:结合工具调用与结构化数据处理、@Chain修饰符使用,解决LLM输出难题,提升AI效能

Langchain[4]:Langchain 0.2革命性突破:结合工具调用与结构化数据处理,解决LLM输出难题,提升AI效能 1.工具调用 大型语言模型 (LLM) 可以通过工具调用功能与外部数据源交互。工具调用是一种强大的技术,允许开发人员构建复杂的应用程序,这些应用程序可以利用 LLM 访问、交…

websocket-react使用

问题 在一个应用中&#xff0c;如果需要在不同的组件之间共享同一个WebSocket连接&#xff0c;可以采用多种方法来实现。 比如&#xff1a;单例模式、全局变量、react context React上下文&#xff08;React Context&#xff09; 如果你使用的是React&#xff0c;可以使用Re…

C++ | Leetcode C++题解之第239题滑动窗口最大值

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<int> maxSlidingWindow(vector<int>& nums, int k) {int n nums.size();vector<int> prefixMax(n), suffixMax(n);for (int i 0; i < n; i) {if (i % k 0) {prefixMax[i] num…

Writing Bazel rules: data and runfiles

Bazel has a neat feature that can simplify a lot of work with tests and executables: the ability to make data files available at run-time using data attributes. You may have seen these in rules like this:Bazel 有一个巧妙的功能&#xff0c;可以简化测试和可执…

简单实用的企业舆情安全解决方案

前言&#xff1a;企业舆情安全重要吗&#xff1f;其实很重要&#xff0c;尤其面对负面新闻&#xff0c;主动处理和应对&#xff0c;可以掌握主动权&#xff0c;避免股价下跌等&#xff0c;那么如何做使用简单实用的企业舆情解决方案呢&#xff1f; 背景 好了&#xff0c;提取词…

CSS技巧专栏:一日一例 7 - 纯CSS实现炫光边框按钮特效

CSS技巧专栏&#xff1a;一日一例 7 - 纯CSS实现炫光边框按钮特效 本例效果图 案例分析 相信你可能已经在网络见过类似这样的流光的按钮&#xff0c;在羡慕别人做的按钮这么酷的时候&#xff0c;你有没有扒一下它的源代码的冲动&#xff1f;或者你当时有点冲动&#xff0c;却…

【PostgreSQL】PostgreSQL简史

博主介绍&#xff1a;✌全网粉丝20W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…

基于python的百度资讯爬虫的设计与实现

研究背景 随着互联网和信息技术的飞速发展&#xff0c;网络已经成为人们获取信息的主要来源之一。特别是搜索引擎&#xff0c;作为信息检索的核心工具&#xff0c;极大地改变了人们获取信息的方式。其中&#xff0c;百度作为中国最受欢迎的搜索引擎之一&#xff0c;其新闻搜索…

开发扫地机器人系统时无法兼容手机解决方案

在开发扫地机器人系统时&#xff0c;遇到无法兼容手机的问题&#xff0c;可以从以下几个方面寻求解决方案&#xff1a; 一、了解兼容性问题根源 ① 操作系统差异&#xff1a;不同手机可能运行不同的操作系统&#xff08;如iOS、Android&#xff09;&#xff0c;且即使是同一操…

leetcode简单题27 N.119 杨辉三角II rust描述

// 直接生成杨辉三角当前行 pub fn get_row(row_index: i32) -> Vec<i32> {let mut row vec![1; (row_index 1) as usize];for i in 1..row_index as usize {for j in (1..i).rev() {row[j] row[j] row[j - 1];}}row } // 空间优化的方法 pub fn get_row2(row_ind…