生产者代码:
package com.kafka.test;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;
import java.util.Map;public class KafkaProducerTest {public static void main(String[] args) {//创建配置对象Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//对生产者的数据k v 进行序列化的操作config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//创建生产者对象 生产者需要设定泛型:数据的类型约束KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);//创建数据ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("test", "hello", "Hello Kafka!");//通过生产者对象发送数据到kafkaproducer.send(producerRecord);//关闭生产者对象producer.close();}
}
消费者代码
package com.kafka.test;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;public class KafkaConsumerTest {public static void main(String[] args) {//创建配置对象Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//对消费者的数据k v 进行反序列化的操作config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());config.put(ConsumerConfig.GROUP_ID_CONFIG,"Test");//取哪个group里的消息//创建消费者对象 消费者需要设定泛型:数据的类型约束KafkaConsumer<String, String> consumer =new KafkaConsumer<String,String>(config);//订阅主题,可以多个 所以这里是listconsumer.subscribe(Collections.singletonList("Test"));//从kafka拉取数据ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(2));for (ConsumerRecord<String, String> record : poll) {System.out.println(record);}}
}