1. 本文使用spring 提供的pulsarTemplate. 内部对于pulsar client 封装了一层
2.生产者为:
String fingerprint = UUID.randomUUID().toString();# 可修改TimeUnit 改为小时,天。
pulsarTemplate.newMessage(fingerprint).withTopic("dddd").withMessageCustomizer(item-> {item.deliverAfter(10L, TimeUnit.SECONDS);}).send();
3. 消费者:
#重点
SubscriptionType设置为shared模式。若不设置,则为即时消费
package com.nami.pulsar.consumer;import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class Consumer {@PulsarListener(topics = "dddd", subscriptionType= SubscriptionType.Shared)public void receiveMessage(String message) {log.info("Received message: {}", message);}}
4. 日志: