文章目录  1、生产者拦截器 1.1、创建生产者拦截器 1.2、KafkaTemplate配置生产者拦截器 1.3、使用Java代码创建主题分区副本 1.4、application.yml配置----v1版 1.5、屏蔽 kafka debug 日志 logback.xml 1.6、引入spring-kafka依赖 1.7、控制台日志     
  
 
 1、生产者拦截器  
 1.1、创建生产者拦截器  
package  com. atguigu. kafka. interceptor ; 
import  org. apache. kafka. clients. producer.  ProducerInterceptor ; 
import  org. apache. kafka. clients. producer.  ProducerRecord ; 
import  org. apache. kafka. clients. producer.  RecordMetadata ; 
import  org. springframework. stereotype.  Component ; 
import  java. util.  Map ; 
@Component 
public  class  MyKafkaInterceptor  implements  ProducerInterceptor < String , String >   { @Override public  ProducerRecord < String ,  String >   onSend ( ProducerRecord < String ,  String >   producerRecord)  { System . out. println ( "生产者即将发送消息:topic = " +  producerRecord. topic ( ) + ",partition:" + producerRecord. partition ( ) + ",key = " + producerRecord. key ( ) + ",value = " + producerRecord. value ( ) ) ; return  null ; } @Override public  void  onAcknowledgement ( RecordMetadata  recordMetadata,  Exception  e)  { if ( e ==  null ) { System . out. println ( "消息发送成功:topic = " +  recordMetadata. topic ( ) + ",partition:" + recordMetadata. partition ( ) + ",offset=" + recordMetadata. offset ( ) + ",timestamp=" + recordMetadata. timestamp ( ) ) ; } } @Override public  void  close ( )  { } @Override public  void  configure ( Map < String ,  ? >   map)  { } 
}   
 1.2、KafkaTemplate配置生产者拦截器  
package  com. atguigu. kafka. producer ; import  com. atguigu. kafka. interceptor.  MyKafkaInterceptor ; 
import  jakarta. annotation.  PostConstruct ; 
import  jakarta. annotation.  Resource ; 
import  org. junit. jupiter. api.  Test ; 
import  org. springframework. boot. test. context.  SpringBootTest ; 
import  org. springframework. kafka. core.  KafkaTemplate ; 
import  java. io.  IOException ; @SpringBootTest 
class  KafkaProducerApplicationTests  { @Resource KafkaTemplate  kafkaTemplate; @Resource MyKafkaInterceptor  myKafkaInterceptor; @PostConstruct public  void  init ( )  { kafkaTemplate. setProducerInterceptor ( myKafkaInterceptor) ; } @Test void  contextLoads ( )  throws  IOException  { kafkaTemplate. send ( "my_topic1" ,  "spring-kafka-生产者拦截器" ) ; System . in. read ( ) ; } 
}   
 1.3、使用Java代码创建主题分区副本  
package  com. atguigu. kafka. config ; 
import  org. apache. kafka. clients. admin.  NewTopic ; 
import  org. springframework. context. annotation.  Bean ; 
import  org. springframework. kafka. config.  TopicBuilder ; 
import  org. springframework. stereotype.  Component ; 
@Component 
public  class  KafkaTopicConfig  { @Bean public  NewTopic  myTopic1 ( )  { return  TopicBuilder . name ( "my_topic1" ) . partitions ( 3 ) . replicas ( 3 ) . build ( ) ; } 
}   
 1.4、application.yml配置----v1版  
server: port:  8110 # v1
spring: kafka: bootstrap- servers:  192.168 .74 .148 : 9095 , 192.168 .74 .148 : 9096 , 192.168 .74 .148 : 9097 producer:  # producer 生产者retries:  0  # 重试次数 0 表示不重试acks:  - 1  # 应答级别: 多少个分区副本备份完成时向生产者发送ack确认( 可选0 、1 、- 1 / all) batch- size:  16384  # 批次大小 单位bytebuffer- memory:  33554432  # 生产者缓冲区大小 单位bytekey- serializer:  org. apache. kafka. common. serialization. StringSerializer # key的序列化器value- serializer:  org. apache. kafka. common. serialization. StringSerializer # value的序列化器  
 1.5、屏蔽 kafka debug 日志 logback.xml  
< configuration>        < logger  name = " org.apache.kafka.clients"   level = " debug"   />  
</ configuration>   
 1.6、引入spring-kafka依赖  
<?xml version="1.0" encoding="UTF-8"?> 
< project  xmlns = " http://maven.apache.org/POM/4.0.0"   xmlns: xsi= " http://www.w3.org/2001/XMLSchema-instance" xsi: schemaLocation= " http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion>  4.0.0</ modelVersion> < parent> < groupId>  org.springframework.boot</ groupId> < artifactId>  spring-boot-starter-parent</ artifactId> < version>  3.0.5</ version> < relativePath/>   </ parent> < groupId>  com.atguigu.kafka</ groupId> < artifactId>  kafka-producer</ artifactId> < version>  0.0.1-SNAPSHOT</ version> < name>  kafka-producer</ name> < description>  kafka-producer</ description> < properties> < java.version>  17</ java.version> </ properties> < dependencies> < dependency> < groupId>  org.springframework.boot</ groupId> < artifactId>  spring-boot-starter</ artifactId> </ dependency> < dependency> < groupId>  org.springframework.boot</ groupId> < artifactId>  spring-boot-starter-test</ artifactId> < scope>  test</ scope> </ dependency> < dependency> < groupId>  org.springframework.kafka</ groupId> < artifactId>  spring-kafka</ artifactId> </ dependency> </ dependencies> < build> < plugins> < plugin> < groupId>  org.springframework.boot</ groupId> < artifactId>  spring-boot-maven-plugin</ artifactId> </ plugin> </ plugins> </ build> </ project>   
 1.7、控制台日志  
生产者即将发送消息:topic =  my_topic1,partition: null ,key =  null ,value =  spring- kafka- 生产者拦截器
消息发送成功:topic =  my_topic1,partition: 0 ,offset= 0 ,timestamp= 1717490776329 
  
[ [ { "partition" :  0 , "offset" :  0 , "msg" :  "spring-kafka-生产者拦截器" , "timespan" :  1717490776329 , "date" :  "2024-06-04 08:46:16" } ] 
]