生产者发送到对应的分区有以下几种方式:
(1)指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)
(2)未指定patition但指定key,通过对key的value进行hash出一个patition;
(3)patition和key都未指定,使用轮询选出一个patition。
但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:
1、实现一个自定义分区类,custompartitioner实现partitioner
import org.apache.kafka.clients.producer.partitioner;
import org.apache.kafka.common.cluster;
import java.util.map;
public class custompartitioner implements partitioner {
/**
*
* @param topic 当前的发送的topic
* @param key 当前的key值
* @param keybytes 当前的key的字节数组
* @param value 当前的value值
* @param valuebytes 当前的value的字节数组
* @param cluster
* @return
*/
@override
public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {
//这边根据返回值就是分区号, 这边就是固定发送到三号分区
return 3;
}
@override
public void close() {
}
@override
public void configure(map configs) {
}
}
2、producer配置文件指定,具体的分区类
// 具体的分区类
props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");
技巧:可以使用producerconfig中提供的配置producerconfig
kafka producer拦截器
拦截器(interceptor)是在kafka 0.10版本被引入的。
interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。
所使用的类为:
org.apache.kafka.clients.producer.producerinterceptor
我们可以编码测试下:
1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)
import org.apache.kafka.clients.producer.producerinterceptor;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import java.util.map;
import java.util.uuid;
public class messageinterceptor implements producerinterceptor {
@override
public void configure(map configs) {
system.out.println("这是messageinterceptor的configure方法");
}
/**
* 这个是消息发送之前进行处理
*
* @param record
* @return
*/
@override
public producerrecord onsend(producerrecord record) {
// 创建一个新的record,把uuid入消息体的最前部
system.out.println("为消息添加uuid");
return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(),
uuid.randomuuid().tostring().replace("-", "") + "," + record.value());
}
/**
* 这个是生产者回调函数调用之前处理
* @param metadata
* @param exception
*/
@override
public void onacknowledgement(recordmetadata metadata, exception exception) {
system.out.println("messageinterceptor拦截器的onacknowledgement方法");
}
@override
public void close() {
system.out.println("messageinterceptor close 方法");
}
}
2、定义计数拦截器
import java.util.map;
import org.apache.kafka.clients.producer.producerinterceptor;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
public class counterinterceptor implements producerinterceptor{
private int errorcounter = 0;
private int successcounter = 0;
@override
public void configure(map configs) {
system.out.println("这是counterinterceptor的configure方法");
}
@override
public producerrecord onsend(producerrecord record) {
system.out.println("counterinterceptor计数过滤器不对消息做任何操作");
return record;
}
@override
public void onacknowledgement(recordmetadata metadata, exception exception) {
// 统计成功和失败的次数
system.out.println("counterinterceptor过滤器执行统计失败和成功数量");
if (exception == null) {
successcounter++;
} else {
errorcounter++;
}
}
@override
public void close() {
// 保存结果
system.out.println("successful sent: " + successcounter);
system.out.println("failed sent: " + errorcounter);
}
}
3、producer客户端:
import org.apache.kafka.clients.producer.*;
import java.util.arraylist;
import java.util.list;
import java.util.properties;
public class producer1 {
public static void main(string[] args) throws exception {
properties props = new properties();
// kafka服务端的主机名和端口号
props.put("bootstrap.servers", "localhost:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 请求延时,可能生产数据太快了
props.put("linger.ms", 1);
// 发送缓存区内存大小,数据是先放到生产者的缓冲区
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
// 具体的分区类
props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");
//定义拦截器
list interceptors = new arraylist<>();
interceptors.add("kafka.messageinterceptor");
interceptors.add("kafka.counterinterceptor");
props.put(producerconfig.interceptor_classes_config, interceptors);
producer producer = new kafkaproducer<>(props);
for (int i = 0; i < 1; i++) {
producer.send(new producerrecord("test_0515", i + "", "xxx-" + i), new callback() {
public void oncompletion(recordmetadata recordmetadata, exception e) {
system.out.println("这是producer回调函数");
}
});
}
/*system.out.println("现在执行关闭producer");
producer.close();*/
producer.close();
}
}
总结,我们可以知道拦截器链各个方法的执行顺序,假如有a、b拦截器,在一个拦截器链中:
(1)执行a的configure方法,执行b的configure方法
(2)执行a的onsend方法,b的onsend方法
(3)生产者发送完毕后,执行a的onacknowledgement方法,b的onacknowledgement方法。
(4)执行producer自身的callback回调函数。
(5)执行a的close方法,b的close方法。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持萬仟网。
如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!