一、定义生产者,在消息中加入RecordHeaders
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;public class KafkaTest26 {public static void main(String[] args) {Properties properties= new Properties();properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);//大概率被消费者拦截器任务超时而丢弃RecordHeaders recordHeaders1 = new RecordHeaders();recordHeaders1.add("ttl", BytesUtils.longToBytes(1));RecordHeaders recordHeaders2 = new RecordHeaders();recordHeaders2.add("ttl", BytesUtils.longToBytes(30));RecordHeaders recordHeaders3 = new RecordHeaders();recordHeaders3.add("ttl", BytesUtils.longToBytes(60));ProducerRecord<String,String> producerRecord1 = new ProducerRecord<>("ttl",0,new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders1);ProducerRecord<String,String> producerRecord2 = new ProducerRecord<>("ttl",0,new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders2);ProducerRecord<String,String> producerRecord3 = new ProducerRecord<>("ttl",0,new Date().getTime(),"fff","hello sister,now is: "+ new Date(), recordHeaders3);Future<RecordMetadata> future = kafkaProducer.send(producerRecord1);Future<RecordMetadata> future2 = kafkaProducer.send(producerRecord2);Future<RecordMetadata> future3 = kafkaProducer.send(producerRecord3);try {future.get();future2.get();future3.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("ok");kafkaProducer.close();}
}
二、定义消费者拦截器:
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class TtlConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {long now = System.currentTimeMillis();Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();for (TopicPartition tp : records.partitions()) {List<ConsumerRecord<String, String>> tpRecords = records.records(tp);List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : tpRecords) {long ttl = -1;for (Header header : record.headers()) {if (header.key().equals("ttl")){ttl = BytesUtils.bytesToLong(header.value());}}// 超时???if (ttl > 0 && (now - record.timestamp() < ttl * 1000)){newTpRecords.add(record);} else {newTpRecords.add(record);}if (!newTpRecords.isEmpty()){newRecords.put(tp, newTpRecords);}}}return new ConsumerRecords<>(newRecords);}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}
三、定义消费者,配置上述拦截器
package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;public class KafkaTest27 {private static Properties getProperties(){Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TtlConsumerInterceptor.class.getName());return properties;}public static void main(String[] args) {KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());String topic="ttl";myConsumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));for(ConsumerRecord record: consumerRecords){System.out.println(record.value());System.out.println("record offset is: "+record.offset());}}}
}