异步技术
企业级应用中广泛使用的三种异步消息传递技术
原文链接:https://blog.csdn.net/qq_55917018/article/details/122122218
三种异步消息传递技术
JMS (java message service)
一个Java规范,等同于JDBC规范,提供了与消息服务相关的API接口
JMS只 是一套接口,并没有给予实现,各大厂商和开源组织都对JMS实现不同产品,这些产品 包括:Apache的ActiveMQ、RocketMQ(前期属于阿里巴巴)、IBM的MQSeries、Microsoft的MSMQ和 VM的RabbitMQ(前期属于Spring source)等等,它们基本都遵循JMS规范。 这里介绍的ActiveMQ是最早的JMS开源产品,在Java世界使用比较广泛,在中等规模的 应用中是完全胜任的。当然,如果要真正面面对大型互联网应,要解决超高并发和吞吐 量问题,现在更推荐使用RabbitMQ、Kafuka或者RocketMQ等新一代的分布式产品,但 它们的基本原理和用法是相通的.
JMS消息模型
- peer-2-peer: 点对点模型,消息发送到一个队列中,队列保存消息。队列中的消息只能被一个消费者消费
- publish-subscribe: 发布订阅模型,消息可以被多个消费者消息,消费者和生产者完全独立,不需要感知对方的存在
JMS 消息种类
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
- ObjectMessage
- Message
JMS实现:ActiveMQ、Redis、HornetMQ、RabbitMQ、RocketMQ
AMQP(advanced message queuing protocol)
一种协议(高级消息队列协议,也是消息代理规范),规范了网络交换的数据格式
**解决:**服务跨平台,服务器供应商、生产者,消费者可以使用不同的语言来实现
AMQP消息模型:
- direct exchange
- fanoout exchange
- topic exchange
- headers exchange
- system exchange
消息种类:byte[]
- AMQP实现:RabbitMQ、StormMQ、RocketMQ
MQTT(Message Queueing Telemetry Transpory)
消息队列遥测传输、专为小设备涉及、是物联网生态系统中主要成分之一
实测
ActiveMQ
下载
官网下载
安装(windows)
启动
服务端口:61616
管理后台端口:8161
ActiveMQ后台:http://127.0.0.1:8161
默认账号、密码:admin\admin
springboot 整合
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
点对点模式
server:port: 8080
spring:activemq:broker-url: tcp://localhost:61616jms:template:default-destination: com.yinhaipub-sub-domain: false
destination:可指定
@Service
public class MessageServiceImpl implements MessageService {@AutowiredJmsMessagingTemplate jmsTemplate;
// private ArrayList list = new ArrayList();@Overridepublic String sendMessage(String id) {System.out.println("消息已经放入队列中了:"+id);jmsTemplate.convertAndSend("order.queue.id",id);
// final boolean add = list.add(id);return "1";}@Overridepublic String doMessage() {
// final Object id = list.remove(0);final String s = jmsTemplate.receiveAndConvert("order.queue.id",String.class);System.out.println("消息已经发送成功了id:"+s);return "1";}
}
JmsListener 监听
监听到队列有消息放入后立马通过入参接收消息
@SendTo(“队列名”)
将入参放入另外一个消息队列
@Component
public class ActiveMQListener {@Autowiredprivate JmsTemplate jmsTemplate;@JmsListener(destination = "order.queue.id")@SendTo("other")private String snedMessage(String message){
// final String s = (String) jmsTemplate.receiveAndConvert();System.out.println("-------------收到后发送:"+message);return message;}
}
RabbitMQ
RabbitMQ 是基于Erlang语言编写的,所以先要安装Erlang语言环境
自行去官网下载
安装Erlang
下载完成后配置环境变量ERLANG_HOME
Path
安装RabbitMQ
自行去官网下载,windows下exe 直接傻瓜式安装
启动:
start/stop
注:启动时报下图错误则是因为已经自启动了
注:如果Erlang版本和Rabbitmq版本不匹配报其他错误
启动管理界面插件
rabbitmq-plugins.bat enable rabbitmq_management
服务端口:5672 管理后台端口:15672
管理界面:http://localhost:15672/
默认密码: guest/guest
Springboot 整合
坐标
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置
server:port: 8080
spring:rabbitmq:host: localhostport: 5672
队列、交换机配置 ---- DirectExchange
@Configuration
public class RabbimtDirectConfig {@Beanpublic Queue queue() {// durable 是否持久化 默认false// exclusive 是否当前连接专用 默认false 当前连接关闭后即被删除// autoDelete 是否自动删除 默认false 生产者消费者不再使用就删除return new Queue("direct.queue");}@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct.exchange");}@BeanBinding binding(Queue queue, DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("direct");}
}
放入消息
@Service
public class MessageServiceImpl implements MessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;// private ArrayList list = new ArrayList();@Overridepublic String sendMessage(String id) {System.out.println("消息已经放入队列中了:"+id);rabbitTemplate.convertAndSend("direct.exchange", "direct", id);
// final boolean add = list.add(id);return "1";}@Overridepublic String doMessage() {
// final Object id = list.remove(0);
// final String s = jmsTemplate.receiveAndConvert("order.queue.id",String.class);// System.out.println("消息已经发送成功了id:"+s);return "1";}
}
监听
@Component
public class RabbitListenre {@RabbitListener(queues="direct.queue")public void listen(String id) {System.out.println("从队列中取出"+id);}
}
队列、交换机配置 ---- TopExchange
匹配规则:模糊匹配
一 :(*)用来表示一个单词,该单词必须出现
二 :(#):用来表示任意数量
@Configuration
public class RabbimtTopicConfig {@Beanpublic Queue topicqueue() {return new Queue("topic.queue");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic.exchange");}@BeanBinding bindingTopic(Queue topicqueue, TopicExchange topicExchange) {return BindingBuilder.bind(topicqueue).to(topicExchange).with("topic.#");}
}
@Overridepublic String sendMessage(String id) {System.out.println("消息已经放入队列中了:"+id);rabbitTemplate.convertAndSend("topic.exchange", "topic.asdasd.asds", id);
// final boolean add = list.add(id);return "1";}
消息监听
@Component
public class RabbitListenre {@RabbitListener(queues="topic.queue")public void listen(String id) {System.out.println("从队列中取出"+id);}
}