安装环境
Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。
需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置。
生产者代码样例
public class KafkaProducerService {private static final String NEO_TOPIC = "elon-topic";private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 10; ++i) {int p = i % 2;ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));producer.send(record);}}
}
发送消息时,将10个数据分别发送到0分区和1分区。
消费者代码样例
public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {properties.put("bootstrap.servers","192.168.5.128:9092"); // 指定 Brokerproperties.put("group.id", "neo1"); // 指定消费组群 IDproperties.put("max.poll.records", "5");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class); // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(NEO_TOPIC)); // 订阅主题 order-eventsnew Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(100);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}
消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。