前言
大家好,在今天的讨论中,我们将深入研究如何将ActiveMQ迁移到云端,以便更好地利用Kubernetes的容器调度和资源管理能力,确保ActiveMQ的高可用性和可扩展性。
ActiveMQ是Apache开源组织推出的一款开源的、完全支持JMS1.1和J2EE1.4规范的JMS Provider实现的消息中间件(MOM)。它是所有开源项目中最流行也最强大的开源消息中间件,主要用于分布式系统架构中,可以实现高可用、高性能、可伸缩、易用和安全的企业级面向消息服务的系统。
ActiveMQ的核心概念主要包括以下几个方面:
- 消息:消息是ActiveMQ中最基本的单位,它包含了实际需要传输的数据。
- 主题(Topic):主题是一种广播类型的消息模式,一个生产者向一个主题发送消息,而所有的消费者都可以接收到这个消息。这种方式非常适合于需要将一条消息分发到多个消费者的场景。
- 队列(Queue):队列是一种点对点的消息模式,一个生产者向一个队列发送消息,只有一个消费者能接收到这个消息。这种方式非常适合于需要将一条消息发送给一个特定的消费者的场景。
- 消费者(Consumer):消费者是从队列或主题中获取并处理消息的应用程序。
- 生产者(Producer):生产者是创建并向队列或主题发送消息的应用程序。
- 消息代理(Broker):消息代理是ActiveMQ的核心组件,它负责接收、存储和转发消息。在ActiveMQ中,每一个运行的实例都是一个消息代理。
- JMS(Java Message Service):Java消息服务是关于面向消息中间件的API,用于在两个应用程序之间或者分布式系统中发送消息,进行异步通信。JMS与具体的平台无关,绝大多数MOM(Message Oriented Middleware)提供商都对JMS提供了支持,例如ActiveMQ就是其中一个实现。
一、部署单机ActiveMQ
步骤一:创建ConfigMap
首先,我们需要创建ConfigMap,用来存储和管理ActiveMQ的相关配置。
apiVersion: v1
kind: ConfigMap
metadata:name: activemq-config-singlenamespace: 你实际的namespace
data:activemq.xml: |<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"><!-- Allows us to use system properties as variables in this configuration file --><bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><property name="locations"><value>file:${activemq.conf}/credentials.properties</value></property></bean><!-- Allows accessing the server log --><bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"lazy-init="false" scope="singleton"init-method="start" destroy-method="stop"></bean><!--The <broker> element is used to configure the ActiveMQ broker.--><broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-single" dataDirectory="${activemq.data}"> <plugins><simpleAuthenticationPlugin><users><authenticationUser username="my_mq_test" password="my_mq_test" groups="users,admins"/></users></simpleAuthenticationPlugin></plugins><destinationPolicy><policyMap><policyEntries><policyEntry topic=">" ><!-- The constantPendingMessageLimitStrategy is used to preventslow topic consumers to block producers and affect other consumersby limiting the number of messages that are retainedFor more information, see:http://activemq.apache.org/slow-consumer-handling.html--><pendingMessageLimitStrategy><constantPendingMessageLimitStrategy limit="1000"/></pendingMessageLimitStrategy></policyEntry></policyEntries></policyMap></destinationPolicy><!--The managementContext is used to configure how ActiveMQ is exposed inJMX. By default, ActiveMQ uses the MBean server that is started bythe JVM. For more information, see:http://activemq.apache.org/jmx.html--><managementContext><managementContext createConnector="false"/></managementContext><!--Configure message persistence for the broker. The default persistencemechanism is the KahaDB store (identified by the kahaDB tag).For more information, see:http://activemq.apache.org/persistence.html--><persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/></persistenceAdapter><!--The systemUsage controls the maximum amount of space the broker willuse before disabling caching and/or slowing down producers. For more information, see:http://activemq.apache.org/producer-flow-control.html--><systemUsage><systemUsage><memoryUsage><memoryUsage percentOfJvmHeap="70" /></memoryUsage><storeUsage><storeUsage limit="100 gb"/></storeUsage><tempUsage><tempUsage limit="50 gb"/></tempUsage></systemUsage></systemUsage><!--The transport connectors expose ActiveMQ over a given protocol toclients and other brokers. For more information, see:http://activemq.apache.org/configuring-transports.html--><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:30226?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:30227?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:30228?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:30229?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:30230?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- destroy the spring context on shutdown to stop jetty --><shutdownHooks><bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /></shutdownHooks></broker><!--Enable web consoles, REST and Ajax APIs and demosThe web consoles requires by default login, you can disable this in the jetty.xml fileTake a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details--><import resource="jetty.xml"/></beans>
---
apiVersion: v1
kind: ConfigMap
metadata:name: activemq-config-jetty-realmnamespace: 你实际的namespace
data:jetty-realm.properties: |admin: my_mq_test, adminuser: user, user
在上面的配置中,我们在activemq.xml
中使用简单授权配置以及修改了默认的端口号以提高服务的安全性;在jetty-realm.properties
中配置了web端控制台的登录用户名和密码,格式为:
用户名 : 密码 ,角色名
步骤二:创建Deployment
接下来,我们需要创建一个Deployment,用来定义ActiveMQ的副本数量、镜像版本等相关信息。
apiVersion: apps/v1
kind: Deployment
metadata:name: activemq-singlenamespace: 你实际的namespace
spec:progressDeadlineSeconds: 600replicas: 1selector:matchLabels:app: activemq-singlestrategy:rollingUpdate:maxSurge: 50%maxUnavailable: 50%type: RollingUpdatetemplate:metadata:labels:app: activemq-singlespec:affinity:nodeAffinity:requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: project.nodeoperator: Invalues:- 你实际的节点名称volumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: config-activemqconfigMap: name: activemq-config-single- name: jetty-realmconfigMap: name: activemq-config-jetty-realmcontainers:- name: activemqimage: webcenter/activemq:5.14.3imagePullPolicy: IfNotPresentterminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts: - name: config-activemqmountPath: /opt/activemq/conf/activemq.xmlsubPath: activemq.xml- name: jetty-realmmountPath: /opt/activemq/conf/jetty-realm.propertiessubPath: jetty-realm.propertiesenv:- name: HOST_IPvalueFrom:fieldRef:fieldPath: status.hostIP- name: POD_IPvalueFrom:fieldRef:fieldPath: status.podIP- name: POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name- name: TZvalue: "Asia/Shanghai"
在上述配置中,我们定义了一个名为activemq-single
的Deployment。在这里,我们使用的镜像已经版本为webcenter/activemq:5.14.3
,并且使用了之前创建的ConfigMap中的配置文件。
步骤三:创建Service
然后,我们还需要创建一个Service,用来将K8S集群中运行的ActiveMQ实例暴露为可访问的服务。
apiVersion: v1
kind: Service
metadata: name: service-activemq-singlenamespace: 你实际的namespace
spec: selector: app: activemq-singletype: NodePortsessionAffinity: Noneports:- name: activemq-adminport: 8161targetPort: 8161nodePort: 30225- name: activemq-tcpport: 30226targetPort: 30226nodePort: 30226- name: activemq-amqpport: 30227targetPort: 30227nodePort: 30227- name: activemq-stompport: 30228targetPort: 30228nodePort: 30228- name: activemq-mqttport: 30229targetPort: 30229nodePort: 30229- name: activemq-wsport: 30230targetPort: 30230nodePort: 30230
步骤四:验证单机ActiveMQ
- 首先,我们启动一个
生产者
链接到刚部署的单机ActiveMQ上,并且向名称为mdm_distribute_CostCenter
的队列中发送了一条消息,消息内容为mdm_distribute_CostCenter
。
- 接下来,我们再启动一个
消息者
同样链接到刚部署的单机ActiveMQ上,并且监听名为mdm_distribute_CostCenter
的队列。
- 最后,我们可以在web端的admin页面查看相应队列中的记录
小结
以上就是在K8S中部署单机ActiveMQ的相关步骤。通过这些步骤,我们成功地使用无状态的Deployment部署了一个可用的单机ActiveMQ。
二、部署ActiveMQ集群(networks of brokers)
步骤一:创建ConfigMap
与单机版类似,我们同样需要创建一个ConfigMap来存储和管理ActiveMQ的相关配置。
apiVersion: v1
kind: ConfigMap
metadata:name: activemq-config-node-0namespace: 你实际的namespace
data:activemq.xml: |<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"><!-- Allows us to use system properties as variables in this configuration file --><bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><property name="locations"><value>file:${activemq.conf}/credentials.properties</value></property></bean><!-- Allows accessing the server log --><bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"lazy-init="false" scope="singleton"init-method="start" destroy-method="stop"></bean><!--The <broker> element is used to configure the ActiveMQ broker.--><broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-node-0" dataDirectory="${activemq.data}"><networkConnectors><networkConnector userName="my_mq_test" password="my_mq_test" uri="static:(tcp://你的实际ip:30220)" duplex="true"/></networkConnectors> <plugins><simpleAuthenticationPlugin><users><authenticationUser username="my_mq_test" password="my_mq_test" groups="users,admins"/></users></simpleAuthenticationPlugin></plugins><destinationPolicy><policyMap><policyEntries><policyEntry topic=">" ><!-- The constantPendingMessageLimitStrategy is used to preventslow topic consumers to block producers and affect other consumersby limiting the number of messages that are retainedFor more information, see:http://activemq.apache.org/slow-consumer-handling.html--><pendingMessageLimitStrategy><constantPendingMessageLimitStrategy limit="1000"/></pendingMessageLimitStrategy></policyEntry></policyEntries></policyMap></destinationPolicy><!--The managementContext is used to configure how ActiveMQ is exposed inJMX. By default, ActiveMQ uses the MBean server that is started bythe JVM. For more information, see:http://activemq.apache.org/jmx.html--><managementContext><managementContext createConnector="false"/></managementContext><!--Configure message persistence for the broker. The default persistencemechanism is the KahaDB store (identified by the kahaDB tag).For more information, see:http://activemq.apache.org/persistence.html--><persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/></persistenceAdapter><!--The systemUsage controls the maximum amount of space the broker willuse before disabling caching and/or slowing down producers. For more information, see:http://activemq.apache.org/producer-flow-control.html--><systemUsage><systemUsage><memoryUsage><memoryUsage percentOfJvmHeap="70" /></memoryUsage><storeUsage><storeUsage limit="100 gb"/></storeUsage><tempUsage><tempUsage limit="50 gb"/></tempUsage></systemUsage></systemUsage><!--The transport connectors expose ActiveMQ over a given protocol toclients and other brokers. For more information, see:http://activemq.apache.org/configuring-transports.html--><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:30218?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:30221?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:30222?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:30223?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:30224?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- destroy the spring context on shutdown to stop jetty --><shutdownHooks><bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /></shutdownHooks></broker><!--Enable web consoles, REST and Ajax APIs and demosThe web consoles requires by default login, you can disable this in the jetty.xml fileTake a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details--><import resource="jetty.xml"/></beans>
---
apiVersion: v1
kind: ConfigMap
metadata:name: activemq-config-node-1namespace: 你实际的namespace
data:activemq.xml: |<beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"><!-- Allows us to use system properties as variables in this configuration file --><bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"><property name="locations"><value>file:${activemq.conf}/credentials.properties</value></property></bean><!-- Allows accessing the server log --><bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"lazy-init="false" scope="singleton"init-method="start" destroy-method="stop"></bean><!--The <broker> element is used to configure the ActiveMQ broker.--><broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-node-0" dataDirectory="${activemq.data}"> <networkConnectors><networkConnector userName="my_mq_test" password="my_mq_test" uri="static:(tcp://你的实际ip:30218)" duplex="true"/></networkConnectors><plugins><simpleAuthenticationPlugin><users><authenticationUser username="my_mq_test" password="my_mq_test" groups="users,admins"/></users></simpleAuthenticationPlugin></plugins><destinationPolicy><policyMap><policyEntries><policyEntry topic=">" ><!-- The constantPendingMessageLimitStrategy is used to preventslow topic consumers to block producers and affect other consumersby limiting the number of messages that are retainedFor more information, see:http://activemq.apache.org/slow-consumer-handling.html--><pendingMessageLimitStrategy><constantPendingMessageLimitStrategy limit="1000"/></pendingMessageLimitStrategy></policyEntry></policyEntries></policyMap></destinationPolicy><!--The managementContext is used to configure how ActiveMQ is exposed inJMX. By default, ActiveMQ uses the MBean server that is started bythe JVM. For more information, see:http://activemq.apache.org/jmx.html--><managementContext><managementContext createConnector="false"/></managementContext><!--Configure message persistence for the broker. The default persistencemechanism is the KahaDB store (identified by the kahaDB tag).For more information, see:http://activemq.apache.org/persistence.html--><persistenceAdapter><kahaDB directory="${activemq.data}/kahadb"/></persistenceAdapter><!--The systemUsage controls the maximum amount of space the broker willuse before disabling caching and/or slowing down producers. For more information, see:http://activemq.apache.org/producer-flow-control.html--><systemUsage><systemUsage><memoryUsage><memoryUsage percentOfJvmHeap="70" /></memoryUsage><storeUsage><storeUsage limit="100 gb"/></storeUsage><tempUsage><tempUsage limit="50 gb"/></tempUsage></systemUsage></systemUsage><!--The transport connectors expose ActiveMQ over a given protocol toclients and other brokers. For more information, see:http://activemq.apache.org/configuring-transports.html--><transportConnectors><!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --><transportConnector name="openwire" uri="tcp://0.0.0.0:30220?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="amqp" uri="amqp://0.0.0.0:30221?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="stomp" uri="stomp://0.0.0.0:30222?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="mqtt" uri="mqtt://0.0.0.0:30223?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/><transportConnector name="ws" uri="ws://0.0.0.0:30224?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/></transportConnectors><!-- destroy the spring context on shutdown to stop jetty --><shutdownHooks><bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /></shutdownHooks></broker><!--Enable web consoles, REST and Ajax APIs and demosThe web consoles requires by default login, you can disable this in the jetty.xml fileTake a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details--><import resource="jetty.xml"/></beans>
---
apiVersion: v1
kind: ConfigMap
metadata:name: activemq-config-jetty-realmnamespace: 你实际的namespace
data:jetty-realm.properties: |admin: my_mq_test, adminuser: user, user
步骤二:创建Deployment
接下来,我们需要创建2个Deployment,分别对应ActiveMQ集群中的2个节点。主要区别在于使用ConfigMap中的配置文件的不同和containers
中暴露的端口不同。
apiVersion: apps/v1
kind: Deployment
metadata:name: activemq-node-0namespace: 你实际的namespace
spec:progressDeadlineSeconds: 600replicas: 1selector:matchLabels:app: activemq-node-0strategy:rollingUpdate:maxSurge: 50%maxUnavailable: 50%type: RollingUpdatetemplate:metadata:labels:app: activemq-node-0name: activemq-nodespec:affinity:nodeAffinity:requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: project.nodeoperator: Invalues:- 你实际的节点名称volumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: config-activemqconfigMap: name: activemq-config-node-0- name: jetty-realmconfigMap: name: activemq-config-jetty-realmcontainers:- name: activemqimage: webcenter/activemq:5.14.3imagePullPolicy: IfNotPresentterminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts: - name: config-activemqmountPath: /opt/activemq/conf/activemq.xmlsubPath: activemq.xml- name: jetty-realmmountPath: /opt/activemq/conf/jetty-realm.propertiessubPath: jetty-realm.propertiesenv:- name: HOST_IPvalueFrom:fieldRef:fieldPath: status.hostIP- name: POD_IPvalueFrom:fieldRef:fieldPath: status.podIP- name: POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name- name: TZvalue: "Asia/Shanghai"
---
apiVersion: apps/v1
kind: Deployment
metadata:name: activemq-node-1namespace: 你实际的namespace
spec:progressDeadlineSeconds: 600replicas: 1selector:matchLabels:app: activemq-node-1strategy:rollingUpdate:maxSurge: 50%maxUnavailable: 50%type: RollingUpdatetemplate:metadata:labels:app: activemq-node-1name: activemq-nodespec:affinity:nodeAffinity:requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: project.nodeoperator: Invalues:- 你实际的节点名称volumes:- name: timezonehostPath:path: /usr/share/zoneinfo/Asia/Shanghai- name: config-activemqconfigMap: name: activemq-config-node-1- name: jetty-realmconfigMap: name: activemq-config-jetty-realmcontainers:- name: activemqimage: webcenter/activemq:5.14.3imagePullPolicy: IfNotPresentterminationMessagePath: /dev/termination-logterminationMessagePolicy: FilevolumeMounts: - name: config-activemqmountPath: /opt/activemq/conf/activemq.xmlsubPath: activemq.xml- name: jetty-realmmountPath: /opt/activemq/conf/jetty-realm.propertiessubPath: jetty-realm.propertiesenv:- name: HOST_IPvalueFrom:fieldRef:fieldPath: status.hostIP- name: POD_IPvalueFrom:fieldRef:fieldPath: status.podIP- name: POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name- name: TZvalue: "Asia/Shanghai"
步骤三:创建Service
然后,我们还需要来创建Service,用来将K8S集群中运行的ActiveMQ实例暴露为可访问的服务。这里同样需要创建2个Service,分别对应步骤二中的2个Deployment,还需要1个Service来暴露公共的端口。
apiVersion: v1
kind: Service
metadata: name: service-activemq-commonnamespace: 你实际的namespace
spec: selector: name: activemq-nodetype: NodePortsessionAffinity: Noneports:- name: activemq-amqpport: 30221targetPort: 30221nodePort: 30221- name: activemq-stompport: 30222targetPort: 30222nodePort: 30222- name: activemq-mqttport: 30223targetPort: 30223nodePort: 30223- name: activemq-wsport: 30224targetPort: 30224nodePort: 30224
---
apiVersion: v1
kind: Service
metadata:name: service-activemq-node-0namespace: 你实际的namespace
spec:selector:app: activemq-node-0type: NodePortsessionAffinity: Noneports:- name: activemq-adminport: 8161targetPort: 8161nodePort: 30217- name: activemq-tcpport: 30218targetPort: 30218nodePort: 30218
---
apiVersion: v1
kind: Service
metadata:name: service-activemq-node-1namespace: 你实际的namespace
spec:selector:app: activemq-node-1type: NodePortsessionAffinity: Noneports:- name: activemq-adminport: 8161targetPort: 8161nodePort: 30219- name: activemq-tcpport: 30220targetPort: 30220nodePort: 30220
步骤五:验证ActiveMQ集群
- 首先,我们启动一个
生产者
链接到刚部署的集群ActiveMQ上,并且向名称为mdm_distribute_Employee
的队列中发送了一条消息,消息内容为mdm_distribute_Employee
。
- 接下来,我们再启动一个
消息者
同样链接到刚部署的集群ActiveMQ上,并且监听名为mdm_distribute_Employee
的队列。
- 最后,我们可以在web端的admin页面查看相应队列中的记录
小结
在K8S中部署ActiveMQ集群的相关步骤已经介绍完毕。通过这些步骤,我们成功地使用无状态的Deployment部署了一个可用的ActiveMQ集群。
结论
本文详尽地探讨了在K8S环境中部署ActiveMQ单机与集群的详细步骤。细读全文,我们可以发现,ActiveMQ的数据存储仍在POD中,这是由于业务需求所决定的。当发送MQ消息时,数据需要先被写入数据库,然后再进行发送,因此ActiveMQ的数据存储变得无关紧要。当然,我们还可以选择使用pvc或者直接挂载到宿主机等方式来保存数据。相较于传统的手动部署方式,利用K8S进行部署能够带来更高的便捷性和效率,从而更快速地完成ActiveMQ集群的部署和管理任务。