ActiveMQ 04 Linux下安装

Active MQ 04

Linux下安装

下载

解压

init.d下建立软连接

ln -s /usr/local/activemq/bin/activemq ./

设置开启启动

chkconfig activemq on

服务管理

service activemq start
service activemq status
service activemq stop

NIO配置

默认配置为tcp,使用的是bio

 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

http://activemq.apache.org/configuring-version-5-transports

Nio是基于TCP的

客户端使用连接时也应使用nio

		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","nio://localhost:61617");

Auto + Nio

<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>

自动适配协议

OpenWire 可用配置选项

OptionDefaultDescription
cacheEnabledtrueShould commonly repeated values be cached so that less marshaling occurs?
cacheSize1024When cacheEnabled=true then this parameter is used to specify the number of values to be cached.
maxInactivityDuration30000The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to die. Therefore allow the broker to kill connections when they have been inactive for the configured period of time. Used by some transports to enable a keep alive heart beat feature. Inactivity monitoring is disabled when set to a value <= 0.
maxInactivityDurationInitalDelay10000The initial delay before starting inactivity checks. Yes, the word 'Inital' is supposed to be misspelled like that.
maxFrameSizeMAX_LONGMaximum allowed frame size. Can help help prevent OOM DOS attacks.
sizePrefixDisabledfalseShould the size of the packet be prefixed before each packet is marshaled?
stackTraceEnabledtrueShould the stack trace of exception that occur on the broker be sent to the client?
tcpNoDelayEnabledtrueDoes not affect the wire format, but provides a hint to the peer that TCP_NODELAY should be enabled on the communications Socket.
tightEncodingEnabledtrueShould wire size be optimized over CPU usage?

Transport 可用配置选项

Option NameDefault ValueDescription
backlog5000Specifies the maximum number of connections waiting to be accepted by the transport server socket.
closeAsynctrueIf true the socket close call happens asynchronously. This parameter should be set to false for protocols like STOMP, that are commonly used in situations where a new connection is created for each read or write. Doing so ensures the socket close call happens synchronously. A synchronous close prevents the broker from running out of available sockets owing to the rapid cycling of connections.
connectionTimeout30000If >=1 the value sets the connection timeout in milliseconds. A value of 0 denotes no timeout. Negative values are ignored.
daemonfalseIf true the transport thread will run in daemon mode. Set this parameter to true when embedding the broker in a Spring container or a web container to allow the container to shut down correctly.
dynamicManagementfalseIf true the TransportLogger can be managed by JMX.
ioBufferSize8 * 1024Specifies the size of the buffer to be used between the TCP layer and the OpenWire layer where wireFormat based marshaling occurs.
jmxPort1099(Client Only) Specifies the port that will be used by the JMX server to manage the TransportLoggers. This should only be set, via URI, by either a client producer or consumer as the broker creates its own JMX server. Specifying an alternate JMX port is useful for developers that test a broker and client on the same machine and need to control both via JMX.
keepAlivefalseIf true, enables TCP KeepAlive on the broker connection to prevent connections from timing out at the TCP level. This should not be confused with KeepAliveInfo messages as used by the InactivityMonitor.
logWriterNamedefaultSets the name of the org.apache.activemq.transport.LogWriter implementation to use. Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
maximumConnectionsInteger.MAX_VALUEThe maximum number of sockets allowed for this broker.
minmumWireFormatVersion0The minimum remote wireFormat version that will be accepted (note the misspelling). Note: when the remote wireFormat version is lower than the configured minimum acceptable version an exception will be thrown and the connection attempt will be refused. A value of 0 denotes no checking of the remote wireFormat version.
socketBufferSize64 * 1024Sets the size, in bytes, for the accepted socket’s read and write buffers.
soLingerInteger.MIN_VALUESets the socket’s option soLinger when the value is > -1. When set to -1 the soLinger socket option is disabled.
soTimeout0Sets the socket’s read timeout in milliseconds. A value of 0 denotes no timeout.
soWriteTimeout0Sets the socket’s write timeout in milliseconds. If the socket write operation does not complete before the specified timeout, the socket will be closed. A value of 0 denotes no timeout.
stackSize0Set the stack size of the transport’s background reading thread. Must be specified in multiples of 128K. A value of 0 indicates that this parameter is ignored.
startLoggingtrueIf true the TransportLogger object of the Transport stack will initially write messages to the log. This parameter is ignored unless trace=true.
tcpNoDelayfalseIf true the socket’s option TCP_NODELAY is set. This disables Nagle’s algorithm for small packet transmission.
threadNameN/AWhen this parameter is specified the name of the thread is modified during the invocation of a transport. The remote address is appended so that a call stuck in a transport method will have the destination information in the thread name. This is extremely useful when using thread dumps for degugging.
tracefalseCauses all commands that are sent over the transport to be logged. To view the logged output define the Log4j logger: log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG.
trafficClass0The Traffic Class to be set on the socket.
diffServ0(Client only) The preferred Differentiated Services traffic class to be set on outgoing packets, as described in RFC 2475. Valid integer values: [0,64]. Valid string values: EF, AF[1-3][1-4] or CS[0-7]. With JDK 6, only works when the JVM uses the IPv4 stack. To use the IPv4 stack set the system property java.net.preferIPv4Stack=true. Note: it’s invalid to specify both ‘diffServ and typeOfService’ at the same time as they share the same position in the TCP/IP packet headers
typeOfService0(Client only) The preferred Type of Service value to be set on outgoing packets. Valid integer values: [0,256]. With JDK 6, only works when the JVM is configured to use the IPv4 stack. To use the IPv4 stack set the system property java.net.preferIPv4Stack=true. Note: it’s invalid to specify both ‘diffServ and typeOfService’ at the same time as they share the same position in the TCP/IP packet headers.
useInactivityMonitortrueWhen false the InactivityMonitor is disabled and connections will never time out.
useKeepAlivetrueWhen true KeepAliveInfo messages are sent on an idle connection to prevent it from timing out. If this parameter is false connections will still timeout if no data was received on the connection for the specified amount of time.
useLocalHostfalseWhen true local connections will be made using the value localhost instead of the actual local host name. On some operating systems, such as OS X, it’s not possible to connect as the local host name so localhost is better.
useQueueForAccepttrueWhen true accepted sockets are placed onto a queue for asynchronous processing using a separate thread.
wireFormatdefaultThe name of the wireFormat factory to use.
wireFormat.*N/AProperties with this prefix are used to configure the wireFormat.

ActiveMQ服务监控 Hawtio

官方网站

https://hawt.io/

部署

独立jar包的形式运行

java -jar

hawtio单程序运行,可以对多个远程ActiveMQ服务器进行监控

嵌入ActiveMQ
  • 下载war包
  • 复制到webapps下

jetty.xml bean标签下添加

				<bean class="org.eclipse.jetty.webapp.WebAppContext">        <property name="contextPath" value="/hawtio" />        <property name="war" value="${activemq.home}/webapps/hawtio.war" />        <property name="logUrlOnStart" value="true" />  </bean>

ActiveMQ.bat下添加

if "%ACTIVEMQ_OPTS%" == "" set ACTIVEMQ_OPTS=-Xms1G -Xmx1G -Dhawtio.realm=activemq -Dhawtio.role=admins -Dhawtio.rolePrincipalClasses=org.apache.activemq.jaas.GroupPrincipal -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config="%ACTIVEMQ_CONF%\login.config" 

JMS消息结构(Message)

Message主要由三部分组成,分别是Header,Properties,Body, 详细如下:

Header消息头,所有类型的这部分格式都是一样的
Properties属性,按类型可以分为应用设置的属性,标准属性和消息中间件定义的属性
Body消息正文,指我们具体需要消息传输的内容。

Header

JMS消息头使用的所有方法:

public interface Message {public Destination getJMSDestination() throws JMSException;public void setJMSDestination(Destination destination) throws JMSException;public int getJMSDeliveryMode() throws JMSExceptionpublic void setJMSDeliveryMode(int deliveryMode) throws JMSException;public String getJMSMessageID() throws JMSException;public void setJMSMessageID(String id) throws JMSException;public long getJMSTimestamp() throws JMSException'public void setJMSTimestamp(long timestamp) throws JMSException;public long getJMSExpiration() throws JMSException;public void setJMSExpiration(long expiration) throws JMSException;public boolean getJMSRedelivered() throws JMSException;public void setJMSRedelivered(boolean redelivered) throws JMSException;public int getJMSPriority() throws JMSException;public void setJMSPriority(int priority) throws JMSException;public Destination getJMSReplyTo() throws JMSException;public void setJMSReplyTo(Destination replyTo) throws JMSException;public String getJMScorrelationID() throws JMSException;public void setJMSCorrelationID(String correlationID) throws JMSException;public byte[] getJMSCorrelationIDAsBytes() throws JMSException;public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;public String getJMSType() throws JMSException;public void setJMSType(String type) throws JMSException;
}

消息头分为自动设置和手动设置的内容

自动头信息

有一部分可以在创建Session和MessageProducer时设置

属性名称说明设置者
JMSDeliveryMode消息的发送模式,分为NON_PERSISTENTPERSISTENT,即非持久性模式的和持久性模式。默认设置为PERSISTENT(持久性)。一条持久性消息应该被传送一次(就一次),这就意味着如果JMS提供者出现故障,该消息并不会丢失; 它会在服务器恢复正常之后再次传送。一条非持久性消息最多只会传送一次,这意味着如果JMS提供者出现故障,该消息可能会永久丢失。在持久性和非持久性这两种传送模式中,消息服务器都不会将一条消息向同一消息者发送一次以上(成功算一次)。send
JMSMessageID消息ID,需要以ID:开头,用于唯一地标识了一条消息send
JMSTimestamp消息发送时的时间。这条消息头用于确定发送消息和它被消费者实际接收的时间间隔。时间戳是一个以毫秒来计算的Long类型时间值(自1970年1月1日算起)。send
JMSExpiration消息的过期时间,以毫秒为单位,用来防止把过期的消息传送给消费者。任何直接通过编程方式来调用setJMSExpiration()方法都会被忽略。send
JMSRedelivered消息是否重复发送过,如果该消息之前发送过,那么这个属性的值需要被设置为true, 客户端可以根据这个属性的值来确认这个消息是否重复发送过,以避免重复处理。Provider
JMSPriority消息的优先级,0-4为普通的优化级,而5-9为高优先级,通常情况下,高优化级的消息需要优先发送。任何直接通过编程方式调用setJMSPriority()方法都将被忽略。send
JMSDestination消息发送的目的地,是一个Topic或Queuesend

JMSDeliveryMode

MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

JMSExpiration

		//将过期时间设置为1小时(1000毫秒 *60 *60)producer.setTimeToLive(1000 * 60 * 60);

JMSPriority

producer.setPriority(9);
手动头信息
属性名称说明设置者
JMSCorrelationID关联的消息ID,这个通常用在需要回传消息的时候client
JMSReplyTo消息回复的目的地,其值为一个Topic或Queue, 这个由发送者设置,但是接收者可以决定是否响应client
JMSType由消息发送者设置的消息类型,代表消息的结构,有的消息中间件可能会用到这个,但这个并不是是批消息的种类,比如TextMessage之类的client

从上表中我们可以看到,系统提供的标准头信息一共有10个属性,其中有6个是由send方法在调用时设置的,有三个是由客户端(client)设置的,还有一个是由消息中间件(Provider)设置的。

需要注意的是,这里

下一代 ActiveMQ 6?Artemis

为下一代事件驱动的消息传递应用程序提供高性能、无阻塞的体系结构。

  • 包含JNDI,具有完整的JMS 1.1 & 2.0客户端实现
  • 高可用性共享存储、网络复制能力
  • 简单而强大的寻址模型协议
  • 灵活的负载均衡分配能力
  • 针对低延迟持久性和JDBC的高级日志实现
  • 与ActiveMQ 5的高功能奇偶校验,以简化迁移

官方文档:http://activemq.apache.org/components/artemis/migration

  • netty
  • 自己的存储
  • 优化传输流程
  • 更高的性能
  • 不再把所有的协议转换成openwire

高级使用

JMSCorrelationID

用于消息之间的关联,给人一种会话的感觉


JMSReplyTo

发送方可以接受到消息消费确认的地址

ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。

ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。

面试题

ActiveMQ如何防止消息丢失?会不会丢消息?

做高可用

死信队列

持久化

ack

消息重投

记录日志

接收(消费)确认

broker负载/限流

如何防止重复消费?

消息幂等处理

map ConcurrentHashMap -> putIfAbsent guava cache

如何保证消费顺序?

queue 优先级别设置

多消费端 ->

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/818931.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言 | Leetcode C语言题解之第30题串联所有单词的子串

题目&#xff1a; 题解&#xff1a; typedef struct {char key[32];int val;UT_hash_handle hh; } HashItem;int* findSubstring(char * s, char ** words, int wordsSize, int* returnSize){ int m wordsSize, n strlen(words[0]), ls strlen(s);int *res (int *)mall…

深入理解ClickHouse 的高性能与高可用原理

架构 ClickHouse 的架构设计旨在提供高性能、高吞吐量的数据存储和查询能力&#xff0c;特别适合处理大规模数据集和实时分析场景。ClickHouse 的架构可以分为几个关键组成部分&#xff0c;它们共同工作以提供高效的数据处理能力和高可用性。 主要组件 1. 存储引擎 ClickHo…

【opencv】示例-videocapture_starter.cpp 从视频文件、图像序列或连接到计算机的摄像头中捕获帧...

/** * file videocapture_starter.cpp * brief 一个使用OpenCV的VideoCapture与捕获设备&#xff0c;视频文件或图像序列的入门示例 * 就像CV_PI一样简单&#xff0c;对吧&#xff1f; * * 创建于: 2010年11月23日 * 作者: Ethan Rublee * * 修改于: 2013年4月17日 * …

【ES6】使用记录

Symbol const sym Symbol(Mo_qyue); console.log(sym.description) //Mo_qyue作为属性名Symbol let mySymbol Symbol() let a {} a[mySymbol] hello; console.log(a[mySymbol]) //hellolet b{[mySymbol]:hello} console.log(b[mySymbol]) //hellolet cObject.defineProp…

金蝶云星空与领星ERP对接集成分布式调入单查询打通添加/编辑本地产品

金蝶云星空与领星ERP对接集成分布式调入单查询打通添加/编辑本地产品 接通系统&#xff1a;金蝶云星空 金蝶K/3Cloud结合当今先进管理理论和数十万家国内客户最佳应用实践&#xff0c;面向事业部制、多地点、多工厂等运营协同与管控型企业及集团公司&#xff0c;提供一个通用的…

Unity 扩展自定义编辑器窗口

在Assets文件夹路径下任意位置创建Editor文件夹&#xff0c;将扩展编辑器的代码放在Editor文件夹下 生成编辑器窗口 代码中首先引用命名空间 using UnityEditor; 然后将创建的类继承自EditorWindow public class MenuEditor : EditorWindow 然后通过扩展编辑器菜单功能调用…

51-40 Align your Latents,基于LDM的高分辨率视频生成

由于数据工程、仿真测试工程&#xff0c;咱们不得不进入AIGC图片视频生成领域。兜兜转转&#xff0c;这一篇与智驾场景特别密切。23年4月&#xff0c;英伟达Nvidia联合几所大学发布了带文本条件融合、时空注意力的Video Latent Diffusion Models。提出一种基于LDM的高分辨率视…

ActiveMQ 06 Request/Response模型实现

Active MQ 06 Request/Response模型实现 QueueRequestor 同步阻塞 TemporaryQueue 异步监听&#xff0c;当消息过多时会创建响应的临时queue JMSCorrelationID 消息属性 异步监听&#xff0c;公用queue 调优总结 Topic加强 可追溯消息 http://activemq.apache.org/re…

自然语言处理(Natural Language Processing, NLP)简介

自然语言处理 (NLP) 是计算机科学的一个分支&#xff0c;更具体地说&#xff0c;是人工智能 (AI) 的分支&#xff0c;旨在让计算机能够以与人类大致相同的方式理解文本和语音。 自然语言处理 (NLP) 将计算语言学&#xff08;基于规则的人类语言建模&#xff09;与统计、机器学…

华为云Stack学习笔记

云服务层-基础设施层-----------------为云服务和华为云stack环境的部署提供基本的硬件支持 1.计算资源&#xff1a;服务器 &#xff08;1&#xff09;国产&#xff1a;兆芯服务器、海光服务器、飞腾服务器、鲲鹏服务器(泰山服务器、黄河服务器、宝德服务器) &#xff08;2&…

JavaWeb--正则表达式

目录 1. 简介 1.1. 语法 1.1.1. 使用RegExp构造函数创建正则表达式 1.1.2. 使用正则表达式字面量语法创建正则表达式 1.1.3. 正则表达式的应用 2. 修饰符 3. 方括号 4. 元字符 5. 量词 6. RegExp对象方法 7. 支持正则的String的方法 8. 正则表达式体验 8.1. 验证 …

【uniapp / vue】中动态添加绑定style 或 class

一、style样式动态设置 1.普通对象动态添加&#xff08;比较常见&#xff09; <template><view><view :style"{color:fontColor}"> </view><view :style"{ paddingTop: num px }"></view><view :style"{bac…

MySQL8.0.36-社区版:通用语法(2)

语法格式规范 sql语句可以以单号或者多行为书写&#xff0c;以分号结尾 可以使用空格或者缩进来增加可读性 mysql的sql语句不区分大小写&#xff0c;但是推荐大写关键字 注释分为单号注释和多行 单号注释&#xff1a;--内容 或者 # 内容 多行注释/* 注释内容 */ sql语句的…

面试算法-175-将有序数组转换为二叉搜索树

题目 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵 平衡 二叉搜索树。 示例 1&#xff1a; 输入&#xff1a;nums [-10,-3,0,5,9] 输出&#xff1a;[0,-3,9,-10,null,5] 解释&#xff1a;[0,-10,5,null,-3,null,9] 也将被视…

C++11的新特性

C11是由C标准委员会指定的语言规范。相比于C98/03&#xff0c;C11则带来了数量可观的变化&#xff0c;其中包含了约140 个新特性&#xff0c;以及对C03标准中约600个缺陷的修正&#xff0c;C11能更好地用于系统开发和库开发、语法更加泛华和简单化、更加稳定和安全&#xff0c;…

数据结构 -- 数组

本篇文章主要是对数组的实操&#xff0c;所以对数组的概念不在赘述&#xff0c;了解更多数组相关可参照链接 Java数组的概念及使用-CSDN博客 1、DynamicArray类 package com.hh.algorithm.array;import java.util.Arrays; import java.util.Iterator; import java.util.functi…

记录--病理切片图像处理

简介 数字病理切片&#xff0c;也称为全幻灯片成像&#xff08;Whole Slide Imaging&#xff0c;WSI&#xff09;或数字切片扫描&#xff0c;是将传统的玻片病理切片通过高分辨率扫描仪转换为数字图像的技术。这种技术对病理学领域具有革命性的意义&#xff0c;因为它允许病理…

【linux】如何写一个launch文件

编写一个ROS&#xff08;Robot Operating System&#xff09;的launch文件是为了方便地启动一组相关的节点&#xff08;nodes&#xff09;、参数服务器&#xff08;parameter server&#xff09;参数、消息发布者/订阅者&#xff08;publishers/subscribers&#xff09;、服务&…

深入浅出 -- 系统架构之Spring、SpringBoot、SpringCloud的区别

首先我们做技术&#xff0c;尤其是java开发人员&#xff0c;应该对Spring、SpringBoot、SpringCloud 三个家伙一点不陌生。 结合发展史Spring出现的最早&#xff0c;后面为了可以让开发人员偷懒&#xff0c;简化配置&#xff0c;就是约定犹于配置或者说大于&#xff0c;进而出…

每日OJ题_BFS解决最短路③_力扣127. 单词接龙

目录 ③力扣127. 单词接龙 解析代码 ③力扣127. 单词接龙 127. 单词接龙 难度 困难 字典 wordList 中从单词 beginWord 和 endWord 的 转换序列 是一个按下述规格形成的序列 beginWord -> s1 -> s2 -> ... -> sk&#xff1a; 每一对相邻的单词只差一个字母。…