说明
Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。
Pulsar 的主要特性如下:
对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。
极低的发布和端到端延迟。
无缝可扩展至超过一百万个主题。
一个简单的客户端 API,具有Java、Go、Python和C++的绑定。
主题的多种订阅类型(独占、共享和故障转移)。
通过Apache BookKeeper提供的持久消息存储来保证消息传递。无服务器轻量级计算框架Pulsar Functions提供流原生数据处理功能。
基于 Pulsar Functions 构建的无服务器连接器框架Pulsar IO可以更轻松地将数据移入和移出 Apache Pulsar。
当数据老化时,分层存储将数据从热/温存储卸载到冷/长期存储(例如S3和GCS)。
安装包下载
本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本
csdn下载 也可以自行去官网下载
解压目录
tar -zxvf apache-pulsar-3.2.2-bin.tar.gz
目录说明
目录 | 描述 |
---|---|
bin | 入口pulsar点脚本和许多其他命令行工具 |
conf | 配置文件,包括broker.conf |
lib | Pulsar 使用的 JAR |
examples | Pulsar 函数示例 |
instances | Pulsar 函数的工件 |
启动Pulsar
bin/pulsar standalone
注意:需要保证jdk在17+
创建Topic
创建一个名为my-topic的topic
bin/pulsar-admin topics create persistent://public/default/my-topic
生产者发送消息
bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'
消费者消费消息
测试批量发送消息
bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"
重新消费
bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0
java生产消息
pom.xml
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.2.2</version> </dependency> <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.14.2</version> </dependency>
代码
package com.pulsar.demo;import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit;public class PulsarProducer {private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class);private static final String SERVER_URL = "pulsar://192.168.xxx:6650";public static void main(String[] args) throws Exception {// 构造Pulsar ClientPulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();// 构造生产者Producer<String> producer = client.newProducer(Schema.STRING).producerName("my-producer").topic("my-topic").batchingMaxMessages(1024).batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true).blockIfQueueFull(true).maxPendingMessages(512).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true).create();// 同步发送消息MessageId messageId = producer.send("Hello World");log.info("message id is {}",messageId);System.out.println(messageId.toString());// 异步发送消息CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");// 阻塞线程,直到返回结果log.info("async message id is {}",asyncMessageId.get());producer.close();// 关闭licent的方式有两种,同步和异步// client.close();client.closeAsync();} }
java消费消息
package com.pulsar.demo;import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType;import java.util.concurrent.TimeUnit;public class PulsarConsumer {private static final String SERVER_URL = "pulsar://192.168.xxx:6650";private static final String topic = "persistent://public/default/my-topic"; // 要订阅的topicpublic static void main(String[] args) throws Exception {// 构造Pulsar ClientPulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();Consumer consumer = client.newConsumer().consumerName("my-consumer").topic("my-topic").subscriptionName("my-subscription").ackTimeout(10, TimeUnit.SECONDS).maxTotalReceiverQueueSizeAcrossPartitions(10).subscriptionType(SubscriptionType.Exclusive).subscribe();while (true) {Message msg = consumer.receive();try {System.out.printf("Message received: %s\n", new String(msg.getData()));consumer.acknowledge(msg);} catch (Exception e) {consumer.negativeAcknowledge(msg);}}} }
停止Pulsar
完成后,您可以关闭 Pulsar 集群。在启动集群的终端窗口中按Ctrl-C 。