ActiveMQ 02 常用API

Active MQ 02

常用API

事务

session.commit();
session.rollback();

用来提交/回滚事务

Purge

清理消息

签收模式

签收代表接收端的session已收到消息的一次确认,反馈给broker

ActiveMQ支持自动签收与手动签收

Session.AUTO_ACKNOWLEDGE

当客户端从receiver或onMessage成功返回时,Session自动签收客户端的这条消息的收条。

Session.CLIENT_ACKNOWLEDGE

客户端通过调用消息(Message)的acknowledge方法签收消息。在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。

Session.DUPS_OK_ACKNOWLEDGE

Session不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用。

持久化

默认持久化是开启的

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

优先级

可以打乱消费顺序

producer.setPriority

配置文件需要指定使用优先级的目的地

<policyEntry queue="queue1" prioritizedMessages="true" />

消息超时/过期

producer.setTimeToLive

设置了消息超时的消息,消费端在超时后无法在消费到此消息。

给消息设置一个超时时间 -> 死信队列 -> 拿出来 -> 重发

死信

此类消息会进入到ActiveMQ.DLQ队列且不会自动清除,称为死信

此处有消息堆积的风险

修改死信队列名称
			  <policyEntry queue="f" prioritizedMessages="true" ><deadLetterStrategy> <individualDeadLetterStrategy   queuePrefix="DLxxQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> </policyEntry>

useQueueForQueueMessages: 设置使用队列保存死信,还可以设置useQueueForTopicMessages,使用Topic来保存死信

让非持久化的消息也进入死信队列
			<individualDeadLetterStrategy   queuePrefix="DLxxQ." useQueueForQueueMessages="true"  processNonPersistent="true" /> 

processNonPersistent=“true”

过期消息不进死信队列
<individualDeadLetterStrategy   processExpired="false"  /> 

独占消费者

	Queue queue = session.createQueue("xxoo?consumer.exclusive=true");

还可以设置优先级

Queue queue = session.createQueue("xxoo?consumer.exclusive=true&consumer.priority=10");

消息类型

object

发送端
		Girl girl = new Girl("qiqi",25,398.0);Message message = session.createObjectMessage(girl);
接受端
		if(message instanceof ActiveMQObjectMessage) {Girl girl = (Girl)((ActiveMQObjectMessage)message).getObject();System.out.println(girl);System.out.println(girl.getName());}

如果遇到此类报错

Exception in thread "main" javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:213)at com.mashibing.mq.Receiver.main(Receiver.java:65)
Caused by: java.lang.ClassNotFoundException: Forbidden class com.mashibing.mq.Girl! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.checkSecurity(ClassLoadingAwareObjectInputStream.java:112)at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:57)at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:211)... 1 more

需要添加信任

		connectionFactory.setTrustedPackages(new ArrayList<String>(Arrays.asList(new String[]{Girl.class.getPackage().getName()})));

bytesMessage

发送端
		BytesMessage bytesMessage = session.createBytesMessage();bytesMessage.writeBytes("str".getBytes());bytesMessage.writeUTF("哈哈");
接受端
		if(message instanceof BytesMessage) {BytesMessage bm = (BytesMessage)message;byte[] b = new byte[1024];int len = -1;while ((len = bm.readBytes(b)) != -1) {System.out.println(new String(b, 0, len));}}

还可以使用ActiveMQ给提供的便捷方法,但要注意读取和写入的顺序

bm.readBoolean()
bm.readUTF()
写入文件
                    FileOutputStream out = null;try {out = new FileOutputStream("d:/aa.txt");} catch (FileNotFoundException e2) {e2.printStackTrace();}byte[] by = new byte[1024];int len = 0 ;try {while((len = bm.readBytes(by))!= -1){out.write(by,0,len);}} catch (Exception e1) {e1.printStackTrace();}

MapMessage

发送端
		MapMessage mapMessage = session.createMapMessage();mapMessage.setString("name","lucy");mapMessage.setBoolean("yihun",false);mapMessage.setInt("age", 17);producer.send(mapMessage);
接收端
		Message message = consumer.receive();MapMessage mes = (MapMessage) message;System.out.println(mes);System.out.println(mes.getString("name"));

消息发送原理

同步与异步

开启事务关闭事务
持久化异步同步
非持久化异步异步

我们可以通过以下几种方式来设置异步发送:

		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");// 2.获取一个向ActiveMQ的连接connectionFactory.setUseAsyncSend(true);ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();connection.setUseAsyncSend(true);

消息堆积

producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。

brokerUrl中设置: tcp://localhost:61616?jms.producerWindowSize=1048576

destinationUri中设置: myQueue?producer.windowSize=1048576

延迟消息投递

首先在配置文件中开启延迟和调度

schedulerSupport=“true”

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

延迟发送

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10*1000);

带间隔的重复发送

		long delay = 10 * 1000;long period = 2 * 1000;int repeat = 9;message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);createProducer.send(message);

Cron表达式定时发送

Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式:

Seconds Minutes Hours DayofMonth Month DayofWeek Year或

Seconds Minutes Hours DayofMonth Month DayofWeek

每一个域可出现的字符如下:

Seconds:可出现", - * /"四个字符,有效范围为0-59的整数

Minutes:可出现", - * /"四个字符,有效范围为0-59的整数

Hours:可出现", - * /"四个字符,有效范围为0-23的整数

DayofMonth:可出现", - * / ? L W C"八个字符,有效范围为0-31的整数

Month:可出现", - * /"四个字符,有效范围为1-12的整数或JAN-DEc

DayofWeek:可出现", - * / ? L C #"四个字符,有效范围为1-7的整数或SUN-SAT两个范围。1表示星期天,2表示星期一, 依次类推

Year:可出现", - * /"四个字符,有效范围为1970-2099年

每一个域都使用数字,但还可以出现如下特殊字符,它们的含义是:

(1):表示匹配该域的任意值,假如在Minutes域使用, 即表示每分钟都会触发事件。

(2)?:只能用在DayofMonth和DayofWeek两个域。它也匹配域的任意值,但实际不会。因为DayofMonth和 DayofWeek会相互影响。例如想在每月的20日触发调度,不管20日到底是星期几,则只能使用如下写法: 13 13 15 20 * ?, 其中最后一位只能用?,而不能使用*,如果使用*表示不管星期几都会触发,实际上并不是这样。

(3)-:表示范围,例如在Minutes域使用5-20,表示从5分到20分钟每分钟触发一次

(4)/:表示起始时间开始触发,然后每隔固定时间触发一次,例如在Minutes域使用5/20,则意味着5分钟触发一次,而25,45等分别触发一次.

(5),:表示列出枚举值值。例如:在Minutes域使用5,20,则意味着在5和20分每分钟触发一次。

(6)L:表示最后,只能出现在DayofWeek和DayofMonth域,如果在DayofWeek域使用5L,意味着在最后的一个星期四触发。

(7)W: 表示有效工作日(周一到周五),只能出现在DayofMonth域,系统将在离指定日期的最近的有效工作日触发事件。例如:在 DayofMonth使用5W,如果5日是星期六,则将在最近的工作日:星期五,即4日触发。如果5日是星期天,则在6日(周一)触发;如果5日在星期一 到星期五中的一天,则就在5日触发。另外一点,W的最近寻找不会跨过月份

(8)LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。

(9)#:用于确定每个月第几个星期几,只能出现在DayofMonth域。例如在4#2,表示某月的第二个星期三。

举几个例子:

0 0 2 1 * ? * 表示在每月的1日的凌晨2点调度任务

0 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业

0 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作

一个cron表达式有至少6个(也可能7个)有空格分隔的时间元素。

按顺序依次为

秒(0~59)

分钟(0~59)

小时(0~23)

天(月)(0~31,但是你需要考虑你月的天数)

月(0~11)

天(星期)(1~7 1=SUN 或 SUN,MON,TUE,WED,THU,FRI,SAT)

年份(1970-2099)

其中每个元素可以是一个值(如6),一个连续区间(9-12),一个间隔时间(8-18/4)(/表示每隔4小时),一个列表(1,3,5),通配符。由于"月份中的日期"和"星期中的日期"这两个元素互斥的,必须要对其中一个设置?

0 0 10,14,16 * * ? 每天上午10点,下午2点,4点

0 0/30 9-17 * * ? 朝九晚五工作时间内每半小时

0 0 12 ? * WED 表示每个星期三中午12点

“0 0 12 * * ?” 每天中午12点触发

“0 15 10 ? * *” 每天上午10:15触发

“0 15 10 * * ?” 每天上午10:15触发

“0 15 10 * * ? *” 每天上午10:15触发

“0 15 10 * * ? 2005” 2005年的每天上午10:15触发

“0 * 14 * * ?” 在每天下午2点到下午2:59期间的每1分钟触发

“0 0/5 14 * * ?” 在每天下午2点到下午2:55期间的每5分钟触发

“0 0/5 14,18 * * ?” 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发

“0 0-5 14 * * ?” 在每天下午2点到下午2:05期间的每1分钟触发

“0 10,44 14 ? 3 WED” 每年三月的星期三的下午2:10和2:44触发

“0 15 10 ? * MON-FRI” 周一至周五的上午10:15触发

“0 15 10 15 * ?” 每月15日上午10:15触发

“0 15 10 L * ?” 每月最后一日的上午10:15触发

“0 15 10 ? * 6L” 每月的最后一个星期五上午10:15触发

“0 15 10 ? * 6L 2002-2005” 2002年至2005年的每月的最后一个星期五上午10:15触发

“0 15 10 ? * 6#3” 每月的第三个星期五上午10:15触发

监听器

可以使用监听器来处理消息接收

consumer.setMessageListener(new MyListener());

需要实现接口MessageListener

public class MyListener implements MessageListener {public void onMessage(Message message) {// TODO Auto-generated method stubTextMessage textMessage = (TextMessage)message;try {System.out.println("xxoo" + textMessage.getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

当收到消息后会调起onMessage方法

消息过滤

消息发送

		MapMessage msg1 = session.createMapMessage();msg1.setString("name", "qiqi");msg1.setString("age", "18");msg1.setStringProperty("name", "qiqi");msg1.setIntProperty("age", 18);MapMessage msg2 = session.createMapMessage();msg2.setString("name", "lucy");msg2.setString("age", "18");msg2.setStringProperty("name", "lucy");msg2.setIntProperty("age", 18);MapMessage msg3 = session.createMapMessage();msg3.setString("name", "qianqian");msg3.setString("age", "17");msg3.setStringProperty("name", "qianqian");msg3.setIntProperty("age", 17);

消息接收

	String selector1 = "age > 17";String selector2 = "name = 'lucy'";MessageConsumer consumer = session.createConsumer(queue,selector2);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/810721.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【01背包】滚动数组优化实现一维01背包DP(对比朴素写法)

01背包 代码 背包问题的滚动数组优化版本建议在完全弄懂了普通的二维01背包问题后再进行食用&#xff0c;不然会出现消化不良的症状… 我们可以将背包问题中DP数组的下标看作成两个集合 下面对比两种不同实现方法的区别&#xff1a; 朴素二维DP版本 使用dp[不超过i的物品集合]…

全量知识系统 程序详细设计 之 三种“活物” 之1(QA百度文库 )

Q1. 今天聊聊 全知系统中 三种“活物”。先从他们的一个简单描述开始&#xff1a;自主&#xff1a;计算机“集群”的“沉”与“浮”自然&#xff1a;AI “众生”的“世”和“界”自由&#xff1a;人类 “公民”的“宇”或“宙” 这是一个非常有趣且深刻的主题&#xff0c;将全…

2024年MCN商业模式运营体系行业发展分析

【干货资料持续更新&#xff0c;以防走丢】 2024年MCN商业模式运营体系行业发展分析 部分资料预览 资料部分是网络整理&#xff0c;仅供学习参考。 mcn运营资料包&#xff08;完整资料包含以下内容&#xff09; 目录 MCN机构运营方案的概要&#xff1a; 一、MCN机构定位与目…

关于java分页功能以及传参规范

不用插件 //当前页码private static final Integer currentPage 2;//设置每页个数private static final Integer pageSize 5;Testpublic void test8() {//手写一个分页测试&#xff0c;不用插件List<Integer> list new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7…

Docker 国内镜像

Docker 国内镜像 安装好Docker/Docker Desktop后&#xff0c;其registry server是默认指向https://hub.docker.com的。在国内该hub源访问速度异常慢&#xff0c;可以通过切换至国内镜像仓库来解决这一问题。 sudo vi /etc/docker/daemon.json 添加以下内容&#xff1a; { “re…

GPT建模与预测实战

代码链接见文末 效果图&#xff1a; 1.数据样本生成方法 训练配置参数&#xff1a; --epochs 40 --batch_size 8 --device 0 --train_path data/train.pkl 其中train.pkl是处理后的文件 因此&#xff0c;我们首先需要执行preprocess.py进行预处理操作&#xff0c;配置参数…

Android-NDK的linux交叉编译环境

NDK工具包下载 NDK 下载 | Android NDK | Android Developers https://github.com/android/ndk/wiki/Unsupported-Downloads 以android-ndk-r26c下载为例&#xff0c;下载后将压缩包解压至/usr目录下 CMakeLists编译选项设置 编译平台变量判断条件中增加一下android条件…

【HDFS】DirectoryScanner与append操作对副本状态判定是否存在冲突?

结论: 不存在问题。 DirectoryScanner周期性地运行,扫描整个DN磁盘上的块与内存里的块做比较。 删除那些内存里没有的但是在磁盘上存在的块, 处理重复块等等。 最近在更细粒度化DN锁。在看到ReplicaMap#replicas方法时,阅读相关代码。 发现DirectoryScanner#scan 与appen…

ubuntu下man手册 查不到 pthread_mutex_lock等系列函数用法的问题

问题 在ubuntu系统中无法man到 pthread_mutex_lock pthread_mutex_trylock pthread_mutex_unlock等函数 $ man pthread_mutex_lock 没有 pthread_mutex_lock 的手册页条目解决方式 输入以下命令 sudo apt-get install manpages-posix manpages-posix-dev 然后输入密码 再次m…

Hive和Pig有什么区别吗?

尽管Hive和Pig都是用于处理大数据的Hadoop生态系统的工具&#xff0c;但它们之间的主要目标和使用方法有很大的差异。以下是对两者的一些比较&#xff1a; 数据处理&#xff1a;Hive更像是一个用于进行数据分析的工具。它提供了一种名为HQL的查询语言&#xff0c;语法类似于SQL…

MobaXterm无法登陆oracle cloud的问题

问题 我在oracle cloud上创建实例的时候&#xff0c;只能使用密钥的方式登陆&#xff0c;当时下载了私钥文件。实例创建好以后&#xff0c;在mobaxterm上使用这个私钥文件无法登陆 排查 尝试使用mobaxterm的keygen&#xff0c;把私钥文件转成ppk格式&#xff0c;还是不行。…

高中数学:三角函数-同角与异角的三角函数关系

一、同角三角函数关系 1、基本公式 知一求二 2、快速求值方法 重点掌握辅助三角形方法 3、题型 3.1、一次式整式求值 sinα和cosα指数是一次的求值&#xff0c;建议用辅助三角形方法 例题 3.2、一次式分式求值 分子、分母同除以sinα或者cosα 例题 3.3、二次式…

区块链安全-----接口测试-Postman

Postman是一款支持http协议的接口调试与测试工具&#xff0c;其主要特点就是功能强大&#xff0c;使用简单且易 用性好 。无论是开发人员进行接口调试&#xff0c;还是测试人员做接口测试&#xff0c;Postman都是我们的首选工具 之一 。 更早的接入测试&#xff0c;更早的发现问…

C++ 类型推导Auto及decltype

目录 auto decltype decltype 和 auto 是 C11 及其后续版本中引入的两个关键字&#xff0c;它们都用于自动类型推导&#xff0c;但在使用和行为上有一些重要的区别。 auto auto 关键字在 C 中用于自动类型推导。编译器会根据初始化表达式自动推断变量的类型。auto 关键字使代…

切面条(蓝桥杯)

目录 题目 分析 代码实现 题目 一根高筋拉面&#xff0c;中间切一刀&#xff0c;可以得到2根面条。 如果先对折1次&#xff0c;中间切一刀&#xff0c;可以得到3根面条。 如果连续对折2次&#xff0c;中间切一刀&#xff0c;可以得到5根面条。 那么&#xff0c;连续对折1…

光耦合器的使用:了解输入和输出之间的关系

光耦合器也称为光隔离器&#xff0c;是许多电子电路中的重要组件&#xff0c;可在输入和输出信号之间提供隔离。它们在各种应用中确保安全、降低噪声和防止接地环路方面发挥着至关重要的作用。在本文中&#xff0c;我们将深入研究光耦合器的基础知识&#xff0c;探讨它们的工作…

人形机器人行业报告:AI赋能人形机器人开启产业化元年

今天分享的是人形机器人专题系列深度研究报告&#xff1a;《AI赋能&#xff0c;人形机器人开启产业化元年》。 &#xff08;报告出品方&#xff1a;国泰君安证券&#xff09; 报告共计&#xff1a;56页 要点 通用性是人形机器人商业化的关键&#xff0c;AI大模型赋能加速机…

打破传统,蔚莱普康定义国货美妆新未来

在全球美妆市场经济改革的今天&#xff0c;中国新兴品牌蔚莱普康&#xff0c;正在以前所未有的速度和规模&#xff0c;冲破瓶颈&#xff0c;赢得市场的广泛认可。这一切&#xff0c;得益于国家政策的扶持和国货品牌自身的不懈努力与创新。 各类国潮产品不断‘出圈’的背后&…

java程序 .exe启动nginx防止重复启动,已解决

java代码生成好的.exe启动nginx服务程序 根据nginx占用端口来解决nginx服务重复启动问题&#xff08;下面代码了解代码逻辑后根据自己的业务需求修改即可&#xff09; 代码&#xff1a; package org.example;import javax.swing.*; import java.awt.*; import java.io.*; …

Linux-select剖析

一、select函数 select函数是IO多路复用的函数&#xff0c;它主要的功能是用来等文件描述符中的事件是否就绪&#xff0c;select可以使我们在同时等待多个文件缓冲区 &#xff0c;减少IO等待的时间&#xff0c;能够提高进程的IO效率。 select()函数允许程序监视多个文件描述符…