一、依赖
< dependency> < groupId> org.springframework.kafka</ groupId> < artifactId> spring-kafka</ artifactId> < version> 2.5.1.RELEASE</ version>
</ dependency>
二、配置文件
spring : kafka : bootstrap-servers : localhost: 9092 consumer : group-id : testGroupauto-offset-reset : latest
三、API
1、生产者
@Component
public class ProducerMsg { @Autowired private KafkaTemplate < String , String > kafkaTemplate; public void send ( String topic, String msg) { kafkaTemplate. send ( topic, msg) ; } public void sendCallback ( String topic, String msg) { kafkaTemplate. send ( topic, msg) . addCallback ( new ListenableFutureCallback < SendResult < String , String > > ( ) { @Override public void onSuccess ( SendResult < String , String > stringStringSendResult) { RecordMetadata recordMetadata = stringStringSendResult. getRecordMetadata ( ) ; final String topic = recordMetadata. topic ( ) ; final int partition = recordMetadata. partition ( ) ; final long offset = recordMetadata. offset ( ) ; System . err. println ( String . format ( "生产消息成功:topic: %s,partition: %s,offset: %s" , topic, partition, offset) ) ; } @Override public void onFailure ( Throwable throwable) { } } ) ; } }
2、消费者
@Component
public class ConsumeMsg { @KafkaListener ( topics = { "USER" , "LOG" } ) public void consumeSingle ( ConsumerRecord < String , String > consumer) { System . err. println ( "监听到kafka消息: " + consumer) ; final String topic = consumer. topic ( ) ; final String value = consumer. value ( ) ; } public void consumeBatch ( List < ConsumerRecord < String , String > > consumers) { consumers. forEach ( consumer -> { final String topic = consumer. topic ( ) ; final String value = consumer. value ( ) ; System . err. println ( String . format ( "topic: %s,value: %s" , topic, value) ) ; } ) ; } }