SofaMQ一些常用的API

SofaMQ的十五种常用的API

引言

SofaMQ作为阿里巴巴开源的消息中间件,提供了丰富的API以支持各种消息传递场景。在本文中,我们将介绍SofaMQ的十五种常用API,并通过实例演示其用法。

1. Producer相关API

1.1 SofaMQProducer

SofaMQProducer是SofaMQ中用于生产消息的主要类。它提供了消息的创建、发送等功能。

示例:
SofaMQProducer producer = new SofaMQProducer();
producer.setInstanceName("producer");
producer.start();Message message = new Message("TopicTest", "TagA", "Hello, SofaMQ!".getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult);producer.shutdown();

1.2 SendResult

SendResult用于表示消息发送的结果,包含消息的状态、消息ID等信息。

示例:
// 假设前面的代码已执行,获取SendResult
System.out.println("消息ID:" + sendResult.getMsgId());
System.out.println("发送状态:" + sendResult.getSendStatus());

2. Consumer相关API

2.1 DefaultMQPushConsumer

DefaultMQPushConsumer是SofaMQ中用于消费消息的主要类。它支持推模式,即主动拉取消息。

示例:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "TagA");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 处理消息逻辑return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});consumer.start();

2.2 MessageListenerConcurrently

MessageListenerConcurrently是消息的并发消费接口,用于处理消费逻辑。

示例:
// 假设前面的代码已执行,注册MessageListenerConcurrently
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 处理消息逻辑return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

3. 其他常用API

3.1 Message

Message表示要发送或接收的消息。可以设置消息的主题、标签、内容等。

示例:
Message message = new Message("TopicTest", "TagA", "Hello, SofaMQ!".getBytes());

3.2 TransactionListener

TransactionListener用于处理事务消息的逻辑,实现自定义的事务处理器。

示例:
// 假设前面的代码已执行,注册TransactionListener
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 检查本地事务状态return LocalTransactionState.COMMIT_MESSAGE;}
});

SofaMQ更多常用API介绍

4. 定时消息发送

4.1 MessageDelayLevel

MessageDelayLevel用于设置消息的延迟级别,以实现定时发送消息。

示例:
Message message = new Message("TopicTest", "TagA", "Hello, SofaMQ!".getBytes());
message.setDelayTimeLevel(MessageDelayLevel.ONE_HOUR);
SendResult sendResult = producer.send(message);
System.out.println(sendResult);

4.2 DelayMessageListener

DelayMessageListener是处理延迟消息的监听器接口,用于消费延迟消息。

示例:
// 假设前面的代码已执行,注册DelayMessageListener
consumer.registerMessageListener((DelayMessageListener) (msgs, context) -> {// 处理延迟消息逻辑return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

5. 批量发送与批量消费

5.1 批量发送消息

List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {Message message = new Message("TopicTest", "TagA", ("Hello, SofaMQ! " + i).getBytes());messageList.add(message);
}
SendResult sendResult = producer.send(messageList);
System.out.println(sendResult);

5.2 批量消费消息

// 假设前面的代码已执行,注册BatchMessageListener
consumer.registerMessageListener((BatchMessageListener) (msgs, context) -> {// 处理批量消息逻辑return ConsumeBatchStatus.SUCCESS;
});

6. 顺序消息发送与消费

6.1 顺序消息发送

List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {Message message = new Message("TopicOrderTest", "TagA", ("Hello, SofaMQ! " + i).getBytes());SendResult sendResult = producer.send(message, (list, message1, o) -> {// 根据业务逻辑确定消息发送顺序return list.get(0);}, null);System.out.println(sendResult);
}

6.2 顺序消息消费

// 假设前面的代码已执行,注册OrderMessageListener
consumer.registerMessageListener((OrderMessageListener) (msgs, context) -> {// 处理顺序消息逻辑return ConsumeOrderlyStatus.SUCCESS;
});

这些API涵盖了SofaMQ更多的特性,包括定时消息、延迟消息、批量发送与消费、顺序消息等。通过合理使用这些API,可以满足不同场景下的消息传递需求。

结语

通过上述实例,我们介绍了SofaMQ的十五种常用API,涵盖了消息的生产、消费、事务等方面。在实际应用中,根据业务需求选择合适的API,能够更加灵活高效地使用SofaMQ。

感谢阅读,希望这些实例对您在SofaMQ的使用过程中有所帮助。如有任何问题或建议,请留言讨论。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/631336.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年网络安全比赛--内存取证(超详细)

一、竞赛时间 180分钟 共计3小时 二、竞赛阶段 竞赛阶段 任务阶段 竞赛任务 竞赛时间 分值 1.从内存文件中找到异常程序的进程&#xff0c;将进程的名称作为Flag值提交&#xff1b; 2.从内存文件中找到黑客将异常程序迁移后的进程编号&#xff0c;将迁移后的进程编号作为Flag值…

Python学习之路——异常捕获

一、什么是异常 当检测到一个错误时&#xff0c;Python解释器就无法继续执行了&#xff0c;反而出现了一些错误的提示&#xff0c;这就是所谓的“异常”&#xff0c;也就是bug 二、异常的捕获方法 当我们的程序遇到bug&#xff0c;那么就下来有两种情况 ①整个程序因为一个…

Statistics with Python Specialisation: 数据库、可视化方法

目录 前言Numpy Arrays (the ndarray)1. 导入 NumPy&#xff1a;2. 创建 NumPy 数组&#xff1a;3. 数组的基本属性&#xff1a;4. 访问数组元素&#xff1a;5. 数组切片&#xff1a;6. 数学运算&#xff1a; pandas 示例&#xff1a;用平均值填充缺失值ScipyMatPlotLib&#x…

Elasticsearch:和 LIamaIndex 的集成

LlamaIndex 是一个数据框架&#xff0c;供 LLM 应用程序摄取、构建和访问私有或特定领域的数据。 LlamaIndex 是开源的&#xff0c;可用于构建各种应用程序。 在 GitHub 上查看该项目。 安装 在 Docker 上设置 Elasticsearch 使用以下 docker 命令启动单节点 Elasticsearch 实…

注解(Annotations)是什么?你用过哪些注解?解释依赖注入(DI)。

在编程语言中&#xff0c;注解&#xff08;Annotation&#xff09;是一种元数据机制&#xff0c;允许程序员向源代码中添加信息&#xff0c;这些信息可以被编译器、IDE或其他工具读取和处理&#xff0c;但不会影响程序的运行时行为。注解主要用于代码的自文档化、编译时检查、运…

chromedriver+Selenium+springboot+Java实现后端截图

chromedriver这种方法实现截图&#xff0c;依赖服务器端的谷歌浏览器软件&#xff0c;需要在服务器端下载谷歌浏览器。 Windows服务器说明 1.下载谷歌浏览器 2.根据第一步下载的谷歌浏览器版本&#xff0c;下载chromedriver&#xff0c;可以在这个页面找到和版本相近的版本去下…

docker部署Jira+配置MySQL8数据库

写在前面&#xff1a;如果你通过docker安装Jira且启动过&#xff0c;然后你现在又想使用mysql数据库&#xff0c;需要注意 你除了停掉原有容器&#xff0c;还需要删除&#xff1a;/var/lib/docker/volumes/jiraVolume/_data下的文件&#xff0c;否则启动后会无法正常使用。注意…

Redis 笔记一

概览 1.Redis核心数据存储结构 2.Redis底层String编码int&embstr&raw 3.Redis底层压缩列表&跳表&哈希表 4.Redis底层Zset实现压缩列表和跳表如何选择 5.基于Redis实现微博&抢红包&12306核心业务 辅助学习&#xff1a;Redis 教程 | 菜鸟教程 1.Redis为什…

ArcGIS初始化软件界面Normal.mxt

ArcGIS有时候永久了&#xff0c;或者呢突然不自觉软件界面乱了&#xff0c;或者一些窗口打开却找不到&#xff01; 这时候可以去删除arcgis的界面配置文件&#xff0c;Normal.mxt 删除后再打开软件&#xff0c;软件界面就会回到初始化设置了&#xff01; 文件所在的路径&…

从零开始学习Zeppelin:大数据可视化分析的交互式开发系统!

介绍&#xff1a;Apache Zeppelin是一个基于Web的交互式开发系统&#xff0c;主要用于进行大数据可视化分析。其核心概念是notebook&#xff0c;所有的操作都可以在notebook中完成。Zeppelin提供了一套非常全面的数据分析解决方案&#xff0c;支持数据采集、数据发现、数据分析…

网络安全应急响应灾备KB

目录 应急响应 定义 特点 国家相关标准和文件 事件分类与分级 应急响应组织 取证与保全 信息安全应急响应管理过程 灾备 关键词 灾备政策 灾备等级 灾备策略 受害恢复能力级别 灾难恢复管理过程 应急响应 定义&#xff1a; 指组织为了应对突发/重大信息安全管理…

canal server初始化源码分析

CanalLauncher类是canal server端启动的入口类&#xff0c;跟随代码进行深入。 在开始之前&#xff0c;我们可以先了解下&#xff0c; canal 配置方式 ManagerCanalInstanceGenerator&#xff1a; 基于manager管理的配置方式&#xff0c;实时感知配置并进行server重启Spring…

k8s---ingress对外服务(七层)

ingress 概念 k8s的对外服务&#xff0c;ingress service作用现在两个方面&#xff1a; 1、集群内部&#xff1a;不断跟踪的变化&#xff0c;更新endpoint中的pod对象&#xff0c;基于pod的ip地址不断变化的一种服务发现机制。 2、集群外部&#xff1a;类似于负载均衡器&a…

elasticsearch[二]-DSL查询语法:全文检索、精准查询(term/range)、地理坐标查询(矩阵、范围)、复合查询(相关性算法)、布尔查询

ES-DSL查询语法&#xff08;全文检索、精准查询、地理坐标查询&#xff09; 1.DSL查询文档 elasticsearch 的查询依然是基于 JSON 风格的 DSL 来实现的。 1.1.DSL 查询分类 Elasticsearch 提供了基于 JSON 的 DSL&#xff08;Domain Specific Language&#xff09;来定义查…

Python ddddocr 构建 exe 程序后运行报错:Failed Load model ... common_old.onnx

文章目录 ddddocr版本简单的 demo解决方案个人简介 ddddocr ddddocr是由sml2h3开发的专为验证码厂商进行对自家新版本验证码难易强度进行验证的一个python库&#xff0c;其由作者与kerlomz共同合作完成&#xff0c;通过大批量生成随机数据后进行深度网络训练&#xff0c;本身并…

qwen在vLLM下的长度外推简易方法

目的 在当前的版本vLLM中实现qwen的长度外推。 解决方法 在qwen的config.json中&#xff0c;增加如下内容&#xff1a; {"rope_scaling": { "type": "dynamic", "factor": 4.0} }dynamic:动态NTK factor:缩放因子&#xff0c;外推长…

虚拟环境中pip install不生效的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

R语言【base】:interactive():R语言是否在交互状态下运行?

Package base version 4.2.0 Usage interactive() Details 交互式的 R 会话是指有一个虚拟的操作手与 R 交互&#xff0c;比如 R 可以针对错误的输入提示更正&#xff0c;或者也可以询问接下来如何处理&#xff0c;或者认为这是可以的并且进行下一步。 GUI 控制台将安排在交…

宏集干货丨探索物联网HMI的端口转发和NAT功能

来源&#xff1a;宏集科技 工业物联网 宏集干货丨探索物联网HMI的端口转发和NAT功能 原文链接&#xff1a;https://mp.weixin.qq.com/s/zF2OqkiGnIME6sov55cGTQ 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; #工业自动化 #工业物联网 #HMI 前 言 端口转发和NAT功…

vue 里 props 类型为 Object 时设置 default: () => {} 返回的是 undefined 而不是 {}?

问题 今天遇到个小坑&#xff0c;就是 vue 里使用 props 传参类型为 Object 的时候设置 default: () > {} 报错&#xff0c;具体代码如下 <template><div class"pre-archive-info"><template v-if"infoData.kaimo ! null">{{ infoD…