Apache-Pulsar安装操作说明

说明

Pulsar 是一种用于服务器到服务器消息传递的多租户高性能解决方案。

Pulsar 的主要特性如下:

对 Pulsar 实例中的多个集群的本机支持,并跨集群无缝地复制消息。
极低的发布和端到端延迟。
无缝可扩展至超过一百万个主题。
一个简单的客户端 API,具有Java、Go、Python和C++的绑定。
主题的多种订阅类型(独占、共享和故障转移)。
通过Apache BookKeeper提供的持久消息存储来保证消息传递。无服务器轻量级计算框架Pulsar Functions提供流原生数据处理功能。
基于 Pulsar Functions 构建的无服务器连接器框架Pulsar IO可以更轻松地将数据移入和移出 Apache Pulsar。
当数据老化时,分层存储将数据从热/温存储卸载到冷/长期存储(例如S3和GCS)。

安装包下载

本文使用的是apache-pulsar-3.2.2-bin.tar.gz版本

csdn下载 也可以自行去官网下载

解压目录

tar -zxvf apache-pulsar-3.2.2-bin.tar.gz

目录说明

目录描述
bin入口pulsar点脚本和许多其他命令行工具
conf配置文件,包括broker.conf
libPulsar 使用的 JAR
examplesPulsar 函数示例
instancesPulsar 函数的工件

启动Pulsar

bin/pulsar standalone

注意:需要保证jdk在17+

创建Topic

创建一个名为my-topic的topic

bin/pulsar-admin topics create persistent://public/default/my-topic

生产者发送消息

bin/pulsar-client produce my-topic --messages 'Hello Pulsar!'

消费者消费消息

测试批量发送消息

bin/pulsar-client produce my-topic --messages "$(seq -s, -f 'Message NO.%g' 1 10)"


重新消费

bin/pulsar-client consume my-topic -s 'my-subscription' -p Earliest -n 0

java生产消息

pom.xml

<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>3.2.2</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-annotations</artifactId><version>2.14.2</version>
</dependency>

代码

package com.pulsar.demo;import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class PulsarProducer {private static final Logger log = LoggerFactory.getLogger(PulsarProducer.class);private static final String SERVER_URL = "pulsar://192.168.xxx:6650";public static void main(String[] args) throws Exception {// 构造Pulsar ClientPulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();// 构造生产者Producer<String> producer = client.newProducer(Schema.STRING).producerName("my-producer").topic("my-topic").batchingMaxMessages(1024).batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS).enableBatching(true).blockIfQueueFull(true).maxPendingMessages(512).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true).create();// 同步发送消息MessageId messageId = producer.send("Hello World");log.info("message id is {}",messageId);System.out.println(messageId.toString());// 异步发送消息CompletableFuture<MessageId> asyncMessageId = producer.sendAsync("This is a async message");// 阻塞线程,直到返回结果log.info("async message id is {}",asyncMessageId.get());producer.close();// 关闭licent的方式有两种,同步和异步// client.close();client.closeAsync();}
}

java消费消息

package com.pulsar.demo;import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;import java.util.concurrent.TimeUnit;public class PulsarConsumer {private static final String SERVER_URL = "pulsar://192.168.xxx:6650";private static final String topic = "persistent://public/default/my-topic"; // 要订阅的topicpublic static void main(String[] args) throws Exception {// 构造Pulsar ClientPulsarClient client = PulsarClient.builder().serviceUrl(SERVER_URL).enableTcpNoDelay(true).build();Consumer consumer = client.newConsumer().consumerName("my-consumer").topic("my-topic").subscriptionName("my-subscription").ackTimeout(10, TimeUnit.SECONDS).maxTotalReceiverQueueSizeAcrossPartitions(10).subscriptionType(SubscriptionType.Exclusive).subscribe();while (true) {Message msg = consumer.receive();try {System.out.printf("Message received: %s\n", new String(msg.getData()));consumer.acknowledge(msg);} catch (Exception e) {consumer.negativeAcknowledge(msg);}}}
}

停止Pulsar

完成后,您可以关闭 Pulsar 集群。在启动集群的终端窗口中按Ctrl-C 。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/798984.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决前端精度丢失问题:后端Long类型到前端的处理策略

在Web开发中&#xff0c;我们经常遇到前后端数据类型不匹配的问题&#xff0c;特别是当后端使用大数据类型如Long时&#xff0c;前端由于JavaScript的数字精度限制&#xff0c;可能导致精度丢失。本文将深入探讨这个问题&#xff0c;并提供两种有效的解决方法。 一、问题背景 …

Java常用类与基础API--String常见算法题目

文章目录 一、面试题&#xff08;1&#xff09;题1 二、常见算法题目&#xff08;1&#xff09;题1&#xff08;2&#xff09;题2&#xff08;3&#xff09;题3&#xff08;4&#xff09;题4&#xff08;5&#xff09;题5 三、案例 一、面试题 &#xff08;1&#xff09;题1 …

2024/4/1—力扣—删除字符使频率相同

代码实现&#xff1a; 思路&#xff1a; 步骤一&#xff1a;统计各字母出现频率 步骤二&#xff1a;频率从高到低排序&#xff0c;形成频率数组 步骤三&#xff1a;频率数组只有如下组合符合要求&#xff1a; 1, 0...0n 1, n...n (, 0)n...n, 1(, 0) bool equalFrequency(char…

CVE-2024-3148 DedeCms makehtml_archives_action sql注入漏洞分析

DedeCMS&#xff08;也称为织梦CMS&#xff09;是一款基于PHPMySQL的开源内容管理系统。 在 DedeCMS 5.7.112 中发现一个被归类为严重的漏洞。此漏洞会影响某些未知文件dede/makehtml_archives_action.php的处理。操作导致 sql 注入。攻击可能是远程发起的。该漏洞已向公众披露…

【服务器uwsgi + flask + nginx的搭建】

目录 服务器uwsgi flask nginx的搭建1. 安装必要的软件2. 启动nginx服务3. 测试Nginx4. 配置uwsgi和flask5. 配置nginx 服务器uwsgi flask nginx的搭建 1. 安装必要的软件 安装Python、uWSGI、Flask 和 Nginx。 # Ubuntu 安装命令 sudo apt-get update sudo apt-get ins…

03-JAVA设计模式-建造者模式

建造者模式 什么是建造者模式 建造者模式&#xff08;Builder Pattern&#xff09;是一种对象构建的设计模式&#xff0c;它允许你通过一步一步地构建一个复杂对象&#xff0c;来隐藏复杂对象的创建细节。 这种模式将一个复杂对象的构建过程与其表示过程分离&#xff0c;使得…

大语言模型上下文窗口初探(下)

由于篇幅原因&#xff0c;本文分为上下两篇&#xff0c;上篇主要讲解上下文窗口的概念、在LLM中的重要性&#xff0c;下篇主要讲解长文本能否成为LLM的护城河、国外大厂对长文本的态度。 3、长文本是护城河吗&#xff1f; 毫无疑问&#xff0c;Kimi从一开始就用“长文本”占领…

力扣 583. 两个字符串的删除操作

题目来源&#xff1a;https://leetcode.cn/problems/delete-operation-for-two-strings/description/ C题解1&#xff1a;动态规划 寻找word1和word2拥有的公共最长子序列&#xff0c;之后分别对word1和word2进行删除操作&#xff0c;即可使word1和word2相等。 寻找公共最长子…

【引子】C++从介绍到HelloWorld

C从介绍到HelloWorld 一、C的介绍1. 简介2. 应用场景3. C的标准![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/e3efb0f207f647729b92c0b5bcd4b330.png)4. C的运行过程 二、Visual Studio的安装1. 什么是Visual Studio2. Visual Studio的安装 三、完成HelloWorld1.…

Windows系统C盘空间优化进阶:磁盘清理与Docker日志管理

Windows系统C盘空间优化进阶&#xff1a;磁盘清理与Docker日志管理 文章目录 Windows系统C盘空间优化进阶&#xff1a;磁盘清理与Docker日志管理磁盘清理工具 使用“运行”命令访问磁盘清理利用存储感知自动管理空间清理WinSxS文件夹结合手动清理策略 小结删除临时文件总结&…

2024新版PHP在线客服系统多商户AI智能在线客服系统源码机器人自动回复即时通讯聊天系统源码PC+H5

搭建环境&#xff1a; 服务器 CPU 2核心 ↑ 运存 2G ↑ 宽带 5M ↑ 服务器操作系统 Linux Centos7.6-7.9 ↑ 运行环境&#xff1a; 宝塔面板 Nginx1.18- 1.22 PHP 7.1-7.3 MYSQL 5.6 -5.7 朵米客服系统是一款全功能的客户服务解决方案&#xff0c;提供多渠道支持…

html5分步问卷调查表模板源码

文章目录 1.设计来源1.1 问卷调查11.2 问卷调查21.3 问卷调查31.4 问卷调查41.5 问卷调查51.6 问卷调查6 2.效果和源码2.1 完整效果2.2 源代码 源码下载 作者&#xff1a;xcLeigh 文章地址&#xff1a;https://blog.csdn.net/weixin_43151418/article/details/137454703 html5分…

simulink 的stm32 ADC模块输出在抽筋,不知为何

% outtypecast(uint16(1000),uint8) % 10003E8,E8232,out232 3 function [y,len] myfcn(u1) headuint8([255 85]);%帧头 out1typecast(uint16(u1),uint8); % out2typecast(uint16(u2),uint8); y[head,out1]; lenuint16(length(y)); 2023b版本&#xff0c;stm32硬件支持…

Azure runbook 使用用户托管标识查看资源状态

Azure runbook 使用用户托管标识查看资源状态 在托管标识里创建用户托管标识在被查看或变更资源进行授权创建自动化账号和runbook发布脚本添加计划 在托管标识里创建用户托管标识 在被查看或变更资源进行授权 这里是选取的Analysis Services 资源 创建自动化账号和runbook 发布…

探索K-近邻算法(KNN):原理、实践应用与文本分类实战

第一部分&#xff1a;引言与背景 KNN算法在机器学习领域的重要性及其地位 KNN算法作为机器学习中的基石之一&#xff0c;由于其概念直观、易于理解并且不需要复杂的模型训练过程&#xff0c;被广泛应用于多种场景。它在监督学习中占据着特殊的位置&#xff0c;尤其适用于实时…

【C#】读取指定XML节点

&#x1f4f0;XML文件 <?xml version"1.0" encoding"utf-8"?> <configuration><userSettings><Internal.Settings type"Desktop"><setting name"StatsDisplayCount" serializeAs"String">…

海量智库 | ANY权限原理介绍

ANY权限是Vastbase中的一种特殊的管理权限&#xff0c;用户能够通过ANY权限执行更广泛的操作&#xff0c;更加便利的管理数据库。 本文将为您介绍ANY权限管理的相关原理。 ANY权限管理相关解释 ANY权限管理&#xff0c;是对数据库内的某一类对象的所有实体进行特定的权限管理…

SQLite 4.9的 OS 接口或“VFS”(十三)

返回&#xff1a;SQLite—系列文章目录 上一篇:SQLite字节码引擎&#xff08;十二&#xff09; 下一篇:SQLite 4.9的虚拟表机制(十四) 1. 引言 本文介绍了 SQLite OS 可移植性层或“VFS” - 模块位于 SQLite 实现堆栈底部 提供跨操作系统的可移植性。 VFS是Virtual File…

TiDB 慢查询日志分析

导读 TiDB 中的慢查询日志是一项 关键的性能监控工具&#xff0c;其主要作用在于协助数据库管理员追踪执行时间较长的 SQL 查询语句。 通过记录那些超过设定阈值的查询&#xff0c;慢查询日志为性能优化提供了关键的线索&#xff0c;有助于发现潜在的性能瓶颈&#xff0c;优化…

libVLC 音频立体声模式切换

在libVLC中&#xff0c;可以使用libvlc_audio_set_channel函数来设置音频的立体声模式。这个函数允许选择不同的音频通道&#xff0c;例如立体声、左声道、右声道、环绕声等。 /*** Set current audio channel.** \param p_mi media player* \param channel the audio channel…