1. pom
< dependency> < groupId> org.springframework.kafka</ groupId> < artifactId> spring-kafka</ artifactId> </ dependency>
2. 生产者
import com. alibaba. fastjson. JSON ;
import com. alibaba. fastjson. serializer. SerializerFeature ;
import com. xxx. npi. module. common. msg. dto. MsgBase ;
import org. springframework. beans. factory. annotation. Value ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. stereotype. Service ; import java. util. ArrayList ;
import java. util. List ; @Service
public class MyMessageProducerService { @Value ( "${npi.default-url}" ) private String domain; private final KafkaTemplate < String , String > kafkaTemplate; public MyMessageProducerService ( KafkaTemplate < String , String > kafkaTemplate) { this . kafkaTemplate = kafkaTemplate; } public < T extends MsgBase > void sendMessage ( String topicName, T msgObj) { List < T > list = new ArrayList < > ( ) ; list. add ( msgObj) ; if ( "https://npi.xxx.com" . equals ( domain) ) { kafkaTemplate. send ( topicName, toJsonString ( list) ) ; } } public < T extends MsgBase > void sendMessage ( String topicName, List < T > list) { if ( "https://npi.xxx.com" . equals ( domain) ) { kafkaTemplate. send ( topicName, toJsonString ( list) ) ; } } private String toJsonString ( Object obj) { return JSON . toJSONString ( obj, SerializerFeature. WriteDateUseDateFormat , SerializerFeature. WriteMapNullValue , SerializerFeature. WriteNullListAsEmpty , SerializerFeature. WriteNullStringAsEmpty , SerializerFeature. DisableCircularReferenceDetect ) ; } }
3. 配置
import org. apache. kafka. clients. producer. ProducerConfig ;
import org. apache. kafka. common. config. SslConfigs ;
import org. springframework. beans. factory. annotation. Value ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. core. io. Resource ;
import org. springframework. kafka. annotation. EnableKafka ;
import org. springframework. kafka. core. DefaultKafkaProducerFactory ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. kafka. core. ProducerFactory ; import java. io. File ;
import java. io. IOException ;
import java. io. InputStream ;
import java. nio. file. Files ;
import java. nio. file. StandardCopyOption ;
import java. util. HashMap ;
import java. util. Map ; @Configuration
@EnableKafka
public class KafkaProducerConfig { @Value ( "${spring.kafka.producer.bootstrap-servers}" ) private String servers; @Value ( "${spring.kafka.producer.retries}" ) private int retries; @Value ( "${spring.kafka.producer.acks}" ) private String acks; @Value ( "${spring.kafka.producer.batch-size}" ) private int batchSize; @Value ( "${spring.kafka.producer.linger-ms}" ) private int lingerMs; @Value ( "${spring.kafka.producer.buffer-memory}" ) private int bufferMemory; @Value ( "${spring.kafka.producer.key-serializer}" ) private String keySerializer; @Value ( "${spring.kafka.producer.value-serializer}" ) private String valueSerializer; @Value ( "${spring.kafka.producer.security.protocol}" ) private String securityProtocol; @Value ( "${spring.kafka.producer.ssl.truststore.location}" ) private Resource sslTruststoreLocationResource; @Value ( "${spring.kafka.producer.ssl.truststore.password}" ) private String sslTruststorePassword; @Value ( "${spring.kafka.producer.sasl.mechanism}" ) private String saslMechanism; @Value ( "${spring.kafka.producer.sasl.jaas.config}" ) private String saslJaasConfig; @SuppressWarnings ( { "unchecked" , "rawtypes" } ) @Bean public KafkaTemplate < String , String > kafkaTemplate ( ) { return new KafkaTemplate ( producerFactory ( ) ) ; } @SuppressWarnings ( "unchecked" ) @Bean public ProducerFactory < String , String > producerFactory ( ) { @SuppressWarnings ( "rawtypes" ) DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory < > ( producerConfigs ( ) ) ; return factory; } public Map < String , Object > producerConfigs ( ) { Map < String , Object > props = new HashMap < > ( ) ; props. put ( "bootstrap.servers" , servers) ; props. put ( "acks" , acks) ; props. put ( "retries" , retries) ; props. put ( "batch.size" , batchSize) ; props. put ( "linger.ms" , lingerMs) ; props. put ( "buffer.memory" , bufferMemory) ; props. put ( "key.serializer" , keySerializer) ; props. put ( "value.serializer" , valueSerializer) ; props. put ( "security.protocol" , securityProtocol) ; props. put ( "sasl.mechanism" , saslMechanism) ; props. put ( "sasl.jaas.config" , saslJaasConfig) ; props. put ( ProducerConfig . ENABLE_IDEMPOTENCE_CONFIG , true ) ; props. put ( SslConfigs . SSL_TRUSTSTORE_TYPE_CONFIG , "JKS" ) ; try { InputStream inputStream = sslTruststoreLocationResource. getInputStream ( ) ; File tempFile = File . createTempFile ( "client_truststore" , ".jks" ) ; Files . copy ( inputStream, tempFile. toPath ( ) , StandardCopyOption . REPLACE_EXISTING ) ; props. put ( SslConfigs . SSL_TRUSTSTORE_LOCATION_CONFIG , tempFile. getAbsolutePath ( ) ) ; props. put ( SslConfigs . SSL_TRUSTSTORE_PASSWORD_CONFIG , sslTruststorePassword) ; } catch ( IOException e) { throw new RuntimeException ( "Failed to locate truststore file" , e) ; } return props; }
}
4. application
spring : kafka : producer : bootstrap-servers : n2.ikt.xxx.com: 9092 , n3.ikt.xxx.com: 9092 , n4.ikt.xxx.com: 9092 , n5.ikt.xxx.com: 9092 , n6.ikt.xxx.com: 9092 acks : allretries : 3 batch-size : 16384 linger-ms : 1 buffer-memory : 33554432 key-serializer : org.apache.kafka.common.serialization.StringSerializervalue-serializer : org.apache.kafka.common.serialization.StringSerializersecurity.protocol : SASL_SSLssl.truststore.location : classpath: client_truststore.jksssl.truststore.password : pwdsasl.mechanism : SCRAM- SHA- 512 sasl.jaas.config : org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf- username' password='pwd';topic : br : mdscinpi.mdscinpi- data.tstmem : mdscinpi.msdcinpi- data.tstfbr : mdscinpi.inpi- data.tstcr : mdscinpi.npi- data.tst