Kafka相关API开发

(一)引入依赖

        用API直接去操作kafka(读写数据)在实际开发中用的并不多,学习它主要还是为了加深对Kafka功能的理解。kafka的读写操作,实际开发中,是通过各类更上层的组件去实现。而这些组件在读写kafka数据时,用的当然是kafka的java api,比如flink、spark streaming和flume等。

<properties> <kafka.version>2.4.1</kafka.version>
</properties>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.version}</version>       
</dependency>

(二)API 开发——producer 生产者

1.构造一个生产者,可以持续发送大量数据

2.构造一个生产者,有必须设置的参数:

bootstrap.server

key.seralizer

value.seralizer

其他的,可选

3.使用特定接口

kafka的生产者发送用户的业务数据时,必须使用org.apache.kafka.common.serialization.Serializer接口的实现类这一序列化框架来序列化用户的数据。

4.发往指定topic

构造一个Kafka生产者后,并没有固定数据要发往的topic,因此,可以将不同的数据发往不同的topic

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** kafka生产者API代码示例*/
public class ProducerDemo {public static void main(String[] args) throws InterruptedException {// 泛型K:要发送的数据中的key// 泛型V:要发送的数据中的value// 隐含之意:kafka中的message,是Key-Value结果的(可以没有Key)Properties props = new Properties();props.setProperty("bootstrap.servers", "node141:9092,node142:9092");// 因为,kafka底层存储没有类型维护机制,用户所发的所有数据类型,都必须变成 序列化后的byte[]// 所以,kafka的producer需要一个针对用户要发送的数据类型的序列化工具类// 且这个序列化工具类,需要实现kafka所提供的序列化工具接口:org.apache.kafka.common.serialization.Serializerprops.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/*** 代码中进行客户端参数配置的另一种写法*/props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.setProperty(ProducerConfig.ACKS_CONFIG, "all");// 消息发送应答级别// 构造一个生产者客户端KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {// 将业务数据封装成客户端所能发送的封装格式// 0->abc0// 1->abc1// TODO:奇数发往abcx,偶数发往abcyProducerRecord<String, String> message = null;if (i % 2 == 0) {message = new ProducerRecord<>("abcy", "user_id" + i, "doit_edu" + i);} else {message = new ProducerRecord<>("abcx", "user_id" + i, "doit_edu" + i);}// 消费时只会打印value的值,key并没有读到// 调用客户端去发送// 数据的发送动作在producer的底层是异步线程去异步发送的,即调用send方法立即执行完毕,直接走之后的代码,不代表数据发送成功producer.send(message);Thread.sleep(100);}// 关闭客户端
//        producer.flush();producer.close();}
}

5.消费消息

(三)API开发——consumer消费者

kafka消费者的起始消费位置有两种决定机制:

1.手动指定了起始位置,它肯定从指定的位置开始

2.如果没有手动指定起始位置,它去找消费者组之前所记录的偏移量开始

3.如果之前的位置也获取不到,就看参数:auto.offset.reset所指定的重置策略

4.如果指定的offset>原有分区内的最大offset,就自动重置到最大的offset

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.*;/*** kafka消费者API代码示例*/
public class ConsumerDemo {public static void main(String[] args) {Properties props = new Properties();props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node141:9092,node142:9092");props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// kafka的消费者,默认是从所属组之前所记录的偏移量开始消费,如果找不到之前记录的偏移量,则从如下参数配置的策略来确定消费起始偏移量:// 1.earliest:自动重置到每个分区的最前一条消息// 2.latest:自动重置到每个分区的最新一条消息// 3.none:如果没有为使用者的组找到以前的偏移,则向使用者抛出异常// 如果输入除了上述三种之外的,会向使用者抛出异常props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 如果latest消息找不到,consumer.seek就起作用了// 设置消费者所属的组idprops.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "d30-1");// 设置消费者自动提交最新的的消费位移——默认是开启的props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");// 设置自动提交位移的时间间隔——默认是5000msprops.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");// 构造一个消费者客户端KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题(可以是多个)
//        consumer.subscribe(Collections.singletonList("abcx"));consumer.subscribe(Arrays.asList("abcx","abcy"));// 正则订阅主题
//        consumer.subscribe(Pattern.compile ("abc.*" ));// 显式指定消费起始偏移量/*TopicPartition abcxP0 = new TopicPartition("abcx", 0);TopicPartition abcxP1 = new TopicPartition("abcx", 1);consumer.seek(abcxP0,10);consumer.seek(abcxP1,15);*/// 循环往复拉取数据boolean condition = true;while (condition) {// 客户端去拉取数据的时候,如果服务端没有数据响应,会保持连接等待服务端响应// poll中传入的超时时长参数,是指等待的最大时长ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// Iterator:迭代器// Iterable:可迭代的,是迭代器的再封装// 实现了Iterable的对象,可以用增强for循环去遍历迭代,也可以从对象上取到iterator,来用iterator.hasNext来迭代// Iterator<ConsumerRecord<String, String>> iterator = records.iterator();// 直接用for循环来迭代本次取到的一批数据for (ConsumerRecord<String, String> record : records) {// ConsumerRecord中,不光有用户的业务数据,还有Kafka注入的元数据String key = record.key();String value = record.value();// 本条消息所属的topic:拉取的时候可能不止一个topic,所以会有这个方法String topic = record.topic();// 本条数据所属的分区int partition = record.partition();// 本条数据的偏移量long offset = record.offset();//key的长度int keySize = record.serializedKeySize();//value的长度int valueSize = record.serializedValueSize();// 当前这条数据所在分区的leader的朝代纪年Optional<Integer> leaderEpoch = record.leaderEpoch();// kafka的数据底层存储中,不光有用户的业务数据,还有大量元数据// timestamp就是其中之一:记录本条数据的时间戳// 时间戳有两种类型:一个是CreateTime(这条数据创建的时间——生产者), LogAppendTime(broker往log里面追加的时间)TimestampType timestampType = record.timestampType();long timestamp = record.timestamp();// 数据头:是生产者在写入数据时附加进去的,相当于用户自定义的元数据// 在生产者写入消息时,可以自定义元数据,所以record.headers()方法就能够消费到// public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)// 如果生产者写入消息时,没有定义元数据,record.headers()方法就不会消费到Headers headers = record.headers();//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }System.out.println(String.format("key = %s, value = %s,topic = %s , partition = %s, offset = %s," +"leader的纪元 = %s, timestampType = %s ,timestamp = %s," +" key序列化的长度 = %s, value 序列化的长度 = %s",key, value, topic, partition, offset,leaderEpoch.get(), timestampType.name, timestamp,keySize, valueSize));}}// 对数据进行业务逻辑处理// 关闭客户端// consumer.close();}
}

有了上面两个API,先开启消费者,然后开启生产者,消费者控制就会输出消息。

 // 当前这条数据所在分区的leader的朝代纪年
Optional<Integer> leaderEpoch = record.leaderEpoch();

当leader有变化,leaderEpoch.get()的值就会+1,初始值为0

(四)API开发——指定偏移量订阅消息

1.subscribe与assign订阅

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;/*** 指定偏移量*/
public class ConsumerDemo2 {public static void main(String[] args) throws IOException {Properties props = new Properties();// 从配置文件中加载props.load(ConsumerDemo2.class.getClassLoader().getResourceAsStream("consumer.properties"));props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "doit30-5");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);/*  // subscribe订阅,会参与消费者组的再均衡机制才能真正获得自己要消费的topic及其分区的consumer.subscribe(Collections.singletonList("ddd"));// 这里无意义地去拉一次数据,主要就是为了确保:分区分配动作已完成consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 然后再定义到指定的偏移量,开始正式消费consumer.seek(new TopicPartition("ddd",0),2);*/// 既然要自己指定一个确定的起始消费位置,那通常隐含之意是不需要去参与消费者组自动再均衡机制,该方法比较常用// 那么,就不要使用subscribe来订阅主题consumer.assign(Arrays.asList(new TopicPartition("ddd", 0)));consumer.seek(new TopicPartition("ddd", 0), 4);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));for (ConsumerRecord<String, String> record : records) {int keySize = record.serializedKeySize();int valueSize = record.serializedValueSize();System.out.println(String.format("key = %s, value = %s,topic = %s , partition = %s, offset = %s," +"leader的纪元 = %s, timestampType = %s ,timestamp = %s," +" key序列化的长度 = %s, value 序列化的长度 = %s",record.key(), record.value(), record.topic(), record.partition(), record.offset(),record.leaderEpoch().get(), record.timestampType().name, record.timestamp(),keySize, valueSize));}}}
}

2.subscribe与assign订阅具体区别

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能:

        在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign()方法订阅分区时,是不具备消费者自动均衡的功能的:

        其实这一点从assign()方法参数可以看出端倪,两种类型 subscribe()都有 ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

3.取消订阅

        如果将subscribe(Collection)或 assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

组协调器就是x组写消费位移的leader副本所在的broker。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/58528.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux离线安装Ollama并完成大模型配置(无网络)

这篇文章主要分享两方面内容&#xff1a; 1&#xff09;在纯内网环境下如何部署ollama 2&#xff09;在纯内网环境下如何配置大模型 话不多说直接开始。 ①离线部署ollama 一、通过浏览器访问ollama官方安装脚本&#xff0c;获取脚本内容。复制里面的内容。 在Linux中执行…

Centos安装配置Jenkins

下载安装 注意&#xff1a;推荐的LTS版本对部分插件不适配&#xff0c;直接用最新的版本&#xff0c;jenkins还需要用到git和maven&#xff0c;服务器上已经安装&#xff0c;可查看参考文档[1]、[2]&#xff0c;本次不再演示 访问开始使用 Jenkins 下载jenkins 上传至服务器…

【java数据结构】二叉树

【java数据结构】二叉树 一、 认识二叉树1.1 二叉树的概念1.2 二叉树的特性&#xff1a;1.3 两种特殊的二叉树&#xff1a;1.4 二叉树的性质&#xff1a;1.5 二叉树的存储&#xff1a; 二、 实现二叉树2.1 二叉树节点的定义&#xff1a;2.2 二叉树的基本操作&#xff1a;获取树…

智源推出小时级超长视频理解大模型Video-XL

北京智源人工智能研究院联合上海交通大学、中国人民大学、北京大学和北京邮电大学等高校推出了一款名为Video-XL的超长视频理解大模型。这款模型是多模态大模型核心能力的重要展示&#xff0c;也是向通用人工智能&#xff08;AGI&#xff09;迈进的关键步骤。与现有多模态大模型…

《链表篇》---两两交换链表中的节点(中等)

题目传送门 1.定义一个虚拟节点链接链表 2.定义一个当前节点指向虚拟节点 3.在当前节点的下一个节点和下下一个节点都不为null的情况下。 定义 node1和node2。保存当前节点后面两个节点的地址。cur.next node2;node1.next node2.next;node2.next node1;cur node1; 4.返回re…

Oracle视频基础1.3.4练习

1.3.4 检查数据库实例启动情况&#xff0c;进程以及进程间通信 ps -ef | grep oracle ipcs启动数据库实例&#xff0c; 用缺省spfilewilson ls -l env | grep ORACLE sqlplus /nolog conn / as sysdba startup shutdown immediate exit clear新创建pfile和spfile指定pfile数…

图书管理系统汇报

【1A536】图书管理系统汇报 项目介绍1.用户登录注册功能1. 1用户角色管理2.图书管理功能2.1 添加图书2.2 编辑图书2.3 删除图书 3.图书搜索和筛选3.1 图书搜索3.2 图书筛选 4.图书借阅、图书归还4.1 图书借阅4.2 图书归还 5.用户信息管理5.1上传头像5.2修改头像5.3 修改密码 项…

JetCache启动循环依赖分析

问题呈现 项目性能优化&#xff0c;需要将本地内存&#xff08;JVM内存&#xff09;替换为本地Redis&#xff08;同一个Pod中的Container&#xff09;&#xff0c;降低JVM内存和GC的压力&#xff0c;同时引入了JetCache简化和统一使用&#xff08;对JetCache也做了扩展&#x…

使用二进制安装K8S 多master节点 高可用集群

目录 1.初始化 1.1 配置静态IP 1.2 配置主机名 1.3 配置hosts文件 1.4 配置主机之间无密码登录&#xff0c;每台机器都按照如下操作 1.5 关闭firewalld防火墙&#xff0c;在xianchaomaster1、xianchaomaster2、xianchaomaster3、xianchaonode1上操作&#xff1a; 1.…

vue的基本使用

简介 vue组件 三个部分组成&#xff1a;结构、样式、逻辑 文本插值 类似于java的spel表达式 属性绑定 綁定是单向绑定的&#xff0c;修改输入框无法改变原本的&#xff0c;只能读&#xff0c;不能写 <input :value"name" placeholder"Type your name"&g…

从APP小游戏到Web漏洞的发现

一、前因&#xff1a; 在对一次公司的一个麻将游戏APP进行渗透测试的时候发现&#xff0c;抓到HTTP请求的接口&#xff0c;但是反编译APK后发现没有在本身发现任何一个关于接口或者域名相关的关键字&#xff0c;对此感到了好奇。 于是直接解压后everything搜索了一下&#xff…

【有啥问啥】视频插帧算法技术原理详解

视频插帧算法技术原理详解 引言 视频插帧&#xff08;Video Interpolation&#xff09;技术&#xff0c;作为计算机视觉领域的一项重要应用&#xff0c;旨在通过算法手段在已有的视频帧之间插入额外的帧&#xff0c;从而提升视频的帧率&#xff0c;使其看起来更加流畅。这一技…

【温酒笔记】DMA

参考文档&#xff1a;野火STM32F103 网友资料整理 1. Direct Memory Access-直接内存访问 DMA控制器独立于内核 是一个单独的外设 DMA1有7个通道DMA2有5个通道DMA有四个等级&#xff0c;非常高&#xff0c;高&#xff0c;中&#xff0c;低四个优先级如果优先等级相同&#xf…

精选云手机平台推荐:五大知名云手机品牌汇总

云手机目前已经在很多行业开始应用&#xff0c;特别是对于需要多设备操作、稳定性能和账号安全保障的用户。下面就为大家推荐几款优质云手机平台&#xff0c;一起来看看各大品牌有什么优势。 1. Ogphone云手机 Ogphone云手机凭借强大的海外网络连接和群控性能受到各行业用户的欢…

C++设计模式结构型模式———适配器模式

文章目录 一、引言二、适配器模式三、类适配器四、总结 一、引言 适配器模式是一种结构型设计模式&#xff0c;它在日常生活中有着广泛的应用&#xff0c;比如各种转换接头和电源适配器&#xff0c;它们的主要作用是解决接口不兼容的问题。就像使用电源适配器将220V的市电转换…

生产车间怎么管?设备、生产、物料管理方法更好

我们都知道&#xff0c;面对竞争激烈的大环境&#xff0c;生产车间对于企业的重要性不言而喻&#xff0c;它是企业发展的关键所在。 生产车间管理是一项复杂且系统性很强的工作&#xff0c;涉及多个重要方面。其中&#xff0c;人员管理是核心之一&#xff0c;员工作为生产活动…

Prometheus套装部署到K8S+Dashboard部署详解

1、添加helm源并更新 helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo update2、创建namespace kubectl create namespace monitoring 3、安装Prometheus监控套装 helm install prometheus prometheus-community/prome…

Redis 主从同步 总结

前言 相关系列 《Redis & 目录》《Redis & 主从同步 & 源码》《Redis & 主从同步 & 总结》《Redis & 主从同步 & 问题》 参考文献 《Redis的主从复制和哨兵机制详解》 概述 简介 主从同步的本质是数据复制机制。主从同步机制用于将master…

认证鉴权框架之—sa-token

一、概述 Satoken 是一个 Java 实现的权限认证框架&#xff0c;它主要用于 Web 应用程序的权限控制。Satoken 提供了丰富的功能来简化权限管理的过程&#xff0c;使得开发者可以更加专注于业务逻辑的开发。 二、逻辑流程 1、登录认证 &#xff08;1&#xff09;、创建token …

PCM5102A具有PLL和32位、384kHz PCM/I2S接口的2.1VRMS、112dB音频立体声DAC

PCM5102A外观和丝印 1 特性 1•超低带外噪声 •具有BCK基准的高性能集成音频锁相环(PLL)&#xff0c;可在内部生成SCK •直接线路电平2.1VRMS输出 •无需隔直电容 •线路电平输出支持低至1kΩ的负载 •智能静音系统&#xff1b;软斜升或斜降搭配模拟静音&#xff0c;实现120dB…