java kafka 分区_Java kafka如何实现自定义分区类和拦截器

生产者发送到对应的分区有以下几种方式:

(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

(2)未指定patition但指定key,通过对key的value进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

1、实现一个自定义分区类,custompartitioner实现partitioner

import org.apache.kafka.clients.producer.partitioner;

import org.apache.kafka.common.cluster;

import java.util.map;

public class custompartitioner implements partitioner {

/**

*

* @param topic 当前的发送的topic

* @param key 当前的key值

* @param keybytes 当前的key的字节数组

* @param value 当前的value值

* @param valuebytes 当前的value的字节数组

* @param cluster

* @return

*/

@override

public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {

//这边根据返回值就是分区号, 这边就是固定发送到三号分区

return 3;

}

@override

public void close() {

}

@override

public void configure(map configs) {

}

}

2、producer配置文件指定,具体的分区类

// 具体的分区类

props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");

技巧:可以使用producerconfig中提供的配置producerconfig

kafka producer拦截器

拦截器(interceptor)是在kafka 0.10版本被引入的。

interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。

所使用的类为:

org.apache.kafka.clients.producer.producerinterceptor

我们可以编码测试下:

1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

import org.apache.kafka.clients.producer.producerinterceptor;

import org.apache.kafka.clients.producer.producerrecord;

import org.apache.kafka.clients.producer.recordmetadata;

import java.util.map;

import java.util.uuid;

public class messageinterceptor implements producerinterceptor {

@override

public void configure(map configs) {

system.out.println("这是messageinterceptor的configure方法");

}

/**

* 这个是消息发送之前进行处理

*

* @param record

* @return

*/

@override

public producerrecord onsend(producerrecord record) {

// 创建一个新的record,把uuid入消息体的最前部

system.out.println("为消息添加uuid");

return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(),

uuid.randomuuid().tostring().replace("-", "") + "," + record.value());

}

/**

* 这个是生产者回调函数调用之前处理

* @param metadata

* @param exception

*/

@override

public void onacknowledgement(recordmetadata metadata, exception exception) {

system.out.println("messageinterceptor拦截器的onacknowledgement方法");

}

@override

public void close() {

system.out.println("messageinterceptor close 方法");

}

}

2、定义计数拦截器

import java.util.map;

import org.apache.kafka.clients.producer.producerinterceptor;

import org.apache.kafka.clients.producer.producerrecord;

import org.apache.kafka.clients.producer.recordmetadata;

public class counterinterceptor implements producerinterceptor{

private int errorcounter = 0;

private int successcounter = 0;

@override

public void configure(map configs) {

system.out.println("这是counterinterceptor的configure方法");

}

@override

public producerrecord onsend(producerrecord record) {

system.out.println("counterinterceptor计数过滤器不对消息做任何操作");

return record;

}

@override

public void onacknowledgement(recordmetadata metadata, exception exception) {

// 统计成功和失败的次数

system.out.println("counterinterceptor过滤器执行统计失败和成功数量");

if (exception == null) {

successcounter++;

} else {

errorcounter++;

}

}

@override

public void close() {

// 保存结果

system.out.println("successful sent: " + successcounter);

system.out.println("failed sent: " + errorcounter);

}

}

3、producer客户端:

import org.apache.kafka.clients.producer.*;

import java.util.arraylist;

import java.util.list;

import java.util.properties;

public class producer1 {

public static void main(string[] args) throws exception {

properties props = new properties();

// kafka服务端的主机名和端口号

props.put("bootstrap.servers", "localhost:9092");

// 等待所有副本节点的应答

props.put("acks", "all");

// 消息发送最大尝试次数

props.put("retries", 0);

// 一批消息处理大小

props.put("batch.size", 16384);

// 请求延时,可能生产数据太快了

props.put("linger.ms", 1);

// 发送缓存区内存大小,数据是先放到生产者的缓冲区

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

// 具体的分区类

props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");

//定义拦截器

list interceptors = new arraylist<>();

interceptors.add("kafka.messageinterceptor");

interceptors.add("kafka.counterinterceptor");

props.put(producerconfig.interceptor_classes_config, interceptors);

producer producer = new kafkaproducer<>(props);

for (int i = 0; i < 1; i++) {

producer.send(new producerrecord("test_0515", i + "", "xxx-" + i), new callback() {

public void oncompletion(recordmetadata recordmetadata, exception e) {

system.out.println("这是producer回调函数");

}

});

}

/*system.out.println("现在执行关闭producer");

producer.close();*/

producer.close();

}

}

总结,我们可以知道拦截器链各个方法的执行顺序,假如有a、b拦截器,在一个拦截器链中:

(1)执行a的configure方法,执行b的configure方法

(2)执行a的onsend方法,b的onsend方法

(3)生产者发送完毕后,执行a的onacknowledgement方法,b的onacknowledgement方法。

(4)执行producer自身的callback回调函数。

(5)执行a的close方法,b的close方法。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持萬仟网。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/557184.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Servlet第三篇【request和response简介、response的常见应用】

response、request对象 Tomcat收到客户端的http请求&#xff0c;会针对每一次请求&#xff0c;分别创建一个代表请求的request对象、和代表响应的response对象 既然request对象代表http请求&#xff0c;那么我们获取浏览器提交过来的数据&#xff0c;找request对象即可。respon…

strip string java_Java StringUtils.strip方法代码示例

import org.apache.commons.lang.StringUtils; //导入方法依赖的package包/类public static void copyResourceFolder(String resourceFolder, String destDir)throws IOException {final File jarFile new File(Util.class.getProtectionDomain().getCodeSource().getLocatio…

spring三种注入方式

设置Spring的作用域 或者使用枚举值设置 单例和多里使用场景 自动注入 Primary 一个接口有多个实现被spring管理吗&#xff0c;在依赖注入式&#xff0c;spring会不知道注入哪个实现类就会抛出NoUniqueBeanDefinitionException异常 使用Primary 来告诉Spring 注入哪个实现…

java虚拟机1.7_《Java虚拟机原理》7.1 精选 —— 总览

1.一个程序在 JVM 中运行的完整流程java程序执行流程.JPG说明&#xff1a;① 类加载器把字节码加载到方法区② 方法区的字节码被 JVM new&#xff0c;在堆内存中生成字节码对象③ 字节码对象被 GC 的要求有&#xff1a;该类没有在其他任何地方被引用&#xff1b;该类的所有的实…

Servlet第四篇【request对象常用方法、应用】

什么是HttpServletRequest HttpServletRequest对象代表客户端的请求&#xff0c;当客户端通过HTTP协议访问服务器时&#xff0c;HTTP请求头中的所有信息都封装在这个对象中&#xff0c;开发人员通过这个对象的方法&#xff0c;可以获得客户这些信息。 简单来说&#xff0c;要…

java 设置控制台标题_修改Tomcat控制台标题以及标题乱码处理

双击“startup.bat”启动Tomcat&#xff0c;控制台默认标题是Tomcat&#xff0c;如下图&#xff1a;修改标题很容易&#xff0c;编辑catalina.bat这个文件(跟startup.bat在同级目录下)&#xff0c;编辑的时候建议使用notepad、editplus之类的工具。然后找到如下代码片段(大约在…

Servlet第五篇【介绍会话技术、Cookie的API、详解、应用】

什么是会话技术 基本概念: 指用户开一个浏览器&#xff0c;访问一个网站,只要不关闭该浏览器&#xff0c;不管该用户点击多少个超链接&#xff0c;访问多少资源&#xff0c;直到用户关闭浏览器&#xff0c;整个这个过程我们称为一次会话. 为什么我们要使用会话技术&#xff1…

mysql 视图sql_SQL的视图

1、什么是视图视图是虚拟的表&#xff0c;是一个存储的查询&#xff0c;虽然不需要实际的物理存储&#xff0c;但是也被看作是一个数据库对象&#xff1b;它与包含数据的表不一样&#xff0c;它不包含任何列或数据&#xff0c;它只是包含使用时动态检索数据的查询&#xff0c;并…

Servlet第六篇【Session介绍、API、生命周期、应用、与Cookie区别】

什么是Session Session 是另一种记录浏览器状态的机制。不同的是Cookie保存在浏览器中&#xff0c;Session保存在服务器中。用户使用浏览器访问服务器的时候&#xff0c;服务器把用户的信息以某种的形式记录在服务器&#xff0c;这就是Session 如果说Cookie是检查用户身上的”…

java 异步阻塞_大白话搞懂什么是同步/异步/阻塞/非阻塞

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼前言在最近的一些面试中&#xff0c;跟应聘者聊了比较多关于“同步/异步&#xff0c;阻塞/非阻塞”相关的话题&#xff0c;发现大家对于这些概念的理解都比较模糊&#xff0c;甚至有的同学会反问“他们不就是同一个东西吗&#xff…

python 3.9.0a0_Python 3.9.0 稳定版发布

IT之家10月6日消息据网友投递&#xff0c;Python 3.9.0 稳定版(Python 3.9.0 final)昨日正式发布&#xff0c;它包含许多新功能和优化&#xff0c;禁止在Windows 7上安装&#xff0c;且默认提供64位安装程序。IT之家了解到&#xff0c;Python 3.9.0 alpha 1首个迭代版本于2019 …

Tomcat+Servlet面试题都在这里

下面是我整理下来的Servlet知识点: 图上的知识点都可以在我其他的文章内找到相应内容。 Tomcat常见面试题 Tomcat的缺省端口是多少&#xff0c;怎么修改 找到Tomcat目录下的conf文件夹进入conf文件夹里面找到server.xml文件打开server.xml文件在server.xml文件里面找到下列…

java sdk下载_Java Sdk下载 | 保利威帮助中心

播放界面开发前准备1.小程序微信开发者后台设置-开发设置-服务器域名中配置 [request合法域名]开始开发1.获取频道直播播放地址index.wxmlindex.js选项说明uid类型&#xff1a;String说明&#xff1a;直播账户idvid类型&#xff1a;String说明&#xff1a;直播频道idvideoConte…

mysql union 与 union all 语法及用法

1.mysql union 语法 mysql union 用于把来自多个select 语句的结果组合到一个结果集合中。语法为&#xff1a; select column,......from table1union [all]select column,...... from table2...在多个select 语句中&#xff0c;对应的列应该具有相同的字段属性&#xff0c…

java反射 获取方法参数名_java 反射借助 asm 获取参数名称最优雅简单的方式

背景说明最近写反射相关的代码&#xff0c;想获取对应的参数名称&#xff0c;却发现没有特别好的方式。jdk7 及其以前&#xff0c;是无法通过反射获取参数名称的。jdk8 可以获取&#xff0c;但是要求指定 -parameter 启动参数&#xff0c;限制较多。期间尝试过类似于 Mybatis 使…

Golang基础知识入门详解

Go语言入门 Go语言入门教程 很多人将 Go 语言 称为 21 世纪的 C 语言&#xff0c;因为 Go 不仅拥有 C 语言的简洁和性能&#xff0c;而且还很好的提供了 21 世纪互联网环境下服务端开发的各种实用特性&#xff0c;让开发者在语言级别就可以方便的得到自己想要的东西。 在 Go…

controller层和service层的作用

1.在controller和service里都写那些代码&#xff1f; Controller&#xff0c;从字面上理解是控制器&#xff0c;所以它是负责业务调度的&#xff0c;所以在这一层应写一些业务的调度代码&#xff0c;而具体的业务处理应放在service中去写&#xff0c;而且service不单纯是对于d…

udp模拟tcp java_Java简单实现UDP和TCP

TCP实现TCP协议需要在双方之间建立连接&#xff0c;通过输入输出流来进行数据的交换&#xff0c;建立需要通过三次握手&#xff0c;断开需要四次挥手&#xff0c;保证了数据的完整性&#xff0c;但传输效率也会相应的降低。简单的TCP实现//服务端public class TcpServer {publi…

为什么Controller层注入的是Service接口,而不是ServiceImpl实现类

错误代码&#xff1a; Service层接口interface PCI{} 接口实现类Serviceclass PCIImpt imeplements PCI{}Controller层Autowiredprivate PCIImpt pciImpt; //注入了实现类在没有使用maven管理的时候注入实现类编译一直未报错&#xff0c;也就是说编译可以通过&#xff0c;但…

ant java 返回_使用Ant自动化我们的java项目生成

现在我们已经了解如何定义属性、依赖关系以及如何运行ant&#xff0c;接下来我们将学习怎样使用ant编译java源代码并生成jar文件。编译源代码由于Ant的主要目标就是生成java应用程序&#xff0c;它内置了javac任务来调用java的编译器。此任务一般定义如下Ant会寻找src目录下所有…