【分布式计算】java消息队列机制

        消息队列是一种在不同组件或应用之间进行数据传递的技术,通常用于处理异步通信。它允许消息的发送者(生产者)和接收者(消费者)之间进行解耦。


概念


        消息队列是一种先进先出(FIFO)的数据结构,它存储待处理的消息直到它们被消费。消息是生产者发送给队列的数据单元,消费者则从队列中读取这些消息进行处理。


原理


1. 生产者:
   - 生产者是创建消息的实体,它负责将消息发送到队列。生产者不需要关心消息的具体处理过程,只需确保消息正确发送到队列。

2. 消息队列:
   - 消息队列充当缓冲区,暂时存储从生产者那里发送过来的消息。队列管理消息的顺序,并确保按照发送的顺序逐一传递给消费者。

3. 消费者:
   - 消费者从消息队列中读取消息,并进行相应的处理。消费者可以是同一应用的其他部分,或者是完全独立的应用。

4. 消息处理:
   - 一旦消息被消费者读取,它可以被确认和删除,或者在处理失败时重新放回队列等待再次处理。


使用场景


异步处理:当应用执行耗时任务时,可以将任务封装成消息发送到队列,由消费者异步处理。
流量控制:在高流量事件如大促销或黑色星期五时,消息队列可以帮助缓冲入站流量,防止系统过载。
解耦服务:在微服务架构中,消息队列可以帮助减少服务之间的直接依赖,通过消息传递来通信,从而提高系统的可维护性和扩展性。


Java消息队列技术

在Java中,消息队列是一种数据结构或服务,用于在不同的应用组件或系统之间异步传递消息。它支持松耦合的架构,允许发送者和接收者独立地进行开发和扩展。消息队列可以帮助缓解高负载、增强系统的可伸缩性,并提供容错机制。下面是一些常见的Java消息队列技术:

1. Apache Kafka:
        Kafka是一个分布式流处理平台,它不仅能够处理消息队列的功能,还能处理复杂的事件流。它特别适合需要高吞吐量和可靠性的大规模数据处理场景。

2. RabbitMQ:
        RabbitMQ是一个开源消息代理,支持多种消息协议。它提供灵活的路由功能,能够保证消息的可靠传输。适合于复杂的消息传递需求和多种不同的通信模式。

3. ActiveMQ:
        Apache ActiveMQ是一个强大的开源消息代理,支持多种JMS(Java Message Service)协议和客户端语言。适用于那些需要JMS标准支持的企业应用。

4. Amazon SQS (Simple Queue Service):
        SQS是一个托管的消息队列服务,提供简单的Web服务API来完全管理队列的消息传输。它能够无限扩展,并且不需要预先安装消息队列基础设施。

5. Google Cloud Pub/Sub:
        Google的Pub/Sub提供了一种全球分布式的消息传递平台,适合处理大量数据的实时交换。


这个流程图展示了使用 ActiveMQ 实现消息队列的基本步骤,包括消息的发送和接收。以下是每个步骤的详细讲解。

 1. 创建 ConnectionFactory

`ConnectionFactory` 是一个接口,用于创建连接到消息中间件(ActiveMQ)的工厂。它是创建连接的起点。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");


2. 使用 ConnectionFactory 创建 Connection

通过 `ConnectionFactory` 创建一个连接对象 `Connection`。

Connection connection = connectionFactory.createConnection();

3. 启动 Connection

在使用连接之前,必须启动它。

connection.start();

4. 使用 Connection 创建一个或多个 JMS Session

通过 `Connection` 创建会话 `Session`。会话是生产和消费消息的上下文。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


5. 使用 Session 创建 Queue 或 Topic

通过会话创建队列(Queue)或主题(Topic)。队列用于点对点消息传递,主题用于发布/订阅消息传递。

Queue queue = session.createQueue("testQueue");
// 或者
Topic topic = session.createTopic("testTopic");

6. 使用 Session 创建 MessageProducer 或 MessageConsumer

根据需要创建消息生产者 `MessageProducer` 或消息消费者 `MessageConsumer`。

创建 MessageProducer

MessageProducer producer = session.createProducer(queue);

创建 MessageConsumer

MessageConsumer consumer = session.createConsumer(queue);

7. 发送消息

使用 `MessageProducer` 发送消息。

TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
producer.send(message);

8. 接收消息

异步接收

设置消息监听器,当有消息到达时自动触发。

consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);} catch (JMSException e) {e.printStackTrace();}}}
});

同步接收

使用 `MessageConsumer.receive()` 方法同步接收消息。

Message message = consumer.receive();
if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);
}

完整代码示例

生产者代码

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Producer {public static void main(String[] args) throws JMSException {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接Connection connection = connectionFactory.createConnection();connection.start();// 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列Queue queue = session.createQueue("testQueue");// 创建生产者MessageProducer producer = session.createProducer(queue);// 创建消息TextMessage message = session.createTextMessage("Hello, ActiveMQ!");// 发送消息producer.send(message);// 关闭连接connection.close();}
}

消费者代码

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Consumer {public static void main(String[] args) throws JMSException {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接Connection connection = connectionFactory.createConnection();connection.start();// 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列Queue queue = session.createQueue("testQueue");// 创建消费者MessageConsumer consumer = session.createConsumer(queue);// 同步接收消息Message message = consumer.receive();if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);}// 异步接收消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);} catch (JMSException e) {e.printStackTrace();}}}});// 为了测试异步接收,保持程序运行一段时间try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}// 关闭连接connection.close();}
}

具体应用

调用百度云api

使用消息队列实现一个调用百度智能云 API 的校园卡程序有助于提高系统的可扩展性和可靠性。消息队列可以解耦生产者和消费者,并实现异步处理。

实现步骤

1. 配置消息队列:
    - 安装并配置 RabbitMQ 或 ActiveMQ。
    - 配置 Spring Boot 项目以连接到消息队列。

2. 创建生产者(Producer):
    - 接收用户上传的图片。
    - 将图片编码为 Base64 格式,并发送到消息队列。

3. 创建消费者(Consumer):
    - 监听消息队列中的消息。
    - 调用百度智能云 API 进行图片识别。
    - 将识别结果存储以便后续查询。

4. 实现控制器(Controller):
    - 提供上传图片的接口。
    - 提供获取识别结果的接口。

系统架构图

+-------------------+        +--------------------+        +-------------------+
|                   |        |                    |        |                   |
|   User Uploads    |        |   Message Queue    |        |    API Consumer   |
|   (Controller)    | -----> |  (RabbitMQ/ActiveMQ)| -----> | (Baidu API Call)  |
|                   |        |                    |        |                   |
+-------------------+        +--------------------+        +-------------------+

具体实现

  • 用户上传图片:用户通过前端页面上传图片,图片通过 RecognitionController 接收并保存到消息队列。
  • 消息队列:图片以消息的形式存储在消息队列中,保证消息的可靠传递。
  • 消息处理RecognitionListener 监听消息队列,当有新消息到达时,调用百度智能云 API 进行图片识别,并将结果保存到 RecognitionService
  • 获取结果:用户可以通过访问 /api/resultPage 来获取最新的识别结果。

代码部分:

1. 配置消息队列:

   配置 `application.properties` 以连接到消息队列(以 ActiveMQ 为例):

   spring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=adminspring.jms.pool.enabled=true

2. 创建生产者(Producer):

  package com.example.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Service;@Servicepublic class RecognitionService {@Autowiredprivate JmsTemplate jmsTemplate;public void sendImageForRecognition(byte[] imageBytes) {jmsTemplate.convertAndSend("animal.recognition.queue", imageBytes);}// 其他方法...}

3. 创建消费者(Consumer):
 

  package com.example.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Componentpublic class RecognitionListener {@Autowiredprivate RecognitionService recognitionService;@JmsListener(destination = "animal.recognition.queue")public void processImage(byte[] imageBytes) {String result = recognitionService.recognizeImage(imageBytes);recognitionService.saveResult(result);}}

4. 实现控制器(Controller):

 package com.example.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.ui.Model;import org.springframework.web.bind.annotation.*;import org.springframework.web.multipart.MultipartFile;import org.springframework.web.servlet.view.RedirectView;import java.io.IOException;@Controller@RequestMapping("/api")public class RecognitionController {@Autowiredprivate RecognitionService recognitionService;@PostMapping("/recognize")public RedirectView recognizeAnimal(@RequestParam("file") MultipartFile file, Model model) throws IOException {if (file.isEmpty()) {model.addAttribute("message", "File is empty");return new RedirectView("/errorPage.html");}byte[] bytes = file.getBytes();recognitionService.saveResult("等待识别结果...");recognitionService.sendImageForRecognition(bytes);return new RedirectView("/resultPage.html");}@GetMapping("/resultPage")@ResponseBodypublic String getResult() {return recognitionService.getResult();}}

消息队列的意义

异步处理:允许用户在上传图片后立即获得响应,而不是等待图片识别结果。
解耦:上传图片的部分和图片识别的部分可以独立开发和扩展。
负载均衡:可以轻松增加更多的消费者以处理高并发请求。
可靠性:消息队列持久化消息,确保即使在系统故障时也不会丢失消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/28440.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习(V)--无监督学习(一)聚类

根据训练样本中是否包含标签信息,机器学习可以分为监督学习和无监督学习。聚类算法是典型的无监督学习,目的是想将那些相似的样本尽可能聚在一起,不相似的样本尽可能分开。 相似度或距离 聚类的核心概念是相似度(similarity)或距离(distance…

嵌入式学习记录6.14(练习)

#include "mainwindow.h" #include "ui_mainwindow.h"MainWindow::MainWindow(QWidget *parent): QMainWindow(parent), ui(new Ui::MainWindow) {ui->setupUi(this);this->resize(1028,783); //设置左侧背景QLabel *lab1new QLabel(this);lab1->…

【内存管理之堆内存】

1.栈上的基元 2.栈上的聚合对象 3.手动分配和释放 4.分配堆内存 5.数组内存分配和释放 6.数组内存分配 7.不要使用野指针 8.黑暗时代

STM32理论 —— μCOS-Ⅲ(2/2):时间管理、消息队列、信号量、任务内嵌信号量/队列、事件标志、软件定时器、内存管理

文章目录 9. 时间管理9.1 OSTimeDly()9.2 OSTimeDlyHMSM()9.3 OSTimeDlyResume()9.4 延时函数实验 10. 消息队列10.1 创建消息队列函数OSQCreate()10.2 发送消息到消息队列函数(写入队列)OSQPost()10.3 获取消息队列中的消息函数(读出队列)OSQPend()10.4 消息队列操作实验 11. …

12 款 Android 照片恢复应用程序列表

丢失难忘的照片总是令人痛苦的。如果软件崩溃或意外删除,Android 设备上的照片也可能会丢失。这时照片恢复应用程序就派上用场了。查看我们为 Android 收集的顶级照片恢复应用程序。 但是,您不会想为自己选择任何照片恢复应用程序。因此,我们…

Doris:冷热分层

目录 一、冷热分层介绍 二、存储策略(Storage policy) 2.1 创建存储资源 2.2 创建存储策略 2.3 使用存储策略 三、使用限制 一、冷热分层介绍 冷热分层支持所有 Doris 功能,只是把部分数据放到对象存储上,以节省成本&am…

openGauss 6.0.0 一主二备集群安装及使用zcbus实现Oracle到openGauss的数据同步

一、前言 openGauss 6.0.0-RC1是openGauss 2024年3月发布的创新版本,该版本生命周期为0.5年。根据openGauss官网介绍,6.0.0-RC1与之前的版本特性功能保持兼容,另外,在和之前版本兼容的基础上增加了很多新功能,比如分区表性能优化…

go的netpoll学习

go的运行时调度框架简介 Go的运行时(runtime)中,由调度器管理:goroutine(G)、操作系统线程(M)和逻辑处理器(P)之间的关系 以实现高效的并发执行 当一个gorout…

统计完全子字符串

很不错的计数问题&#xff0c;用到了分组循环技巧和滑动窗口 代码的实现方式也非常值得多看 class Solution { public:int f(string s,int k){int res 0;for(int m1;m<26&&k*m<s.size();m){int cnt[27]{};auto check[&](){for(int i0;i<26;i){if(cnt[i]…

跟着刘二大人学pytorch(第---10---节课之卷积神经网络)

文章目录 0 前言0.1 课程链接&#xff1a;0.2 课件下载地址&#xff1a; 回忆卷积卷积过程&#xff08;以输入为单通道、1个卷积核为例&#xff09;卷积过程&#xff08;以输入为3通道、1个卷积核为例&#xff09;卷积过程&#xff08;以输入为N通道、1个卷积核为例&#xff09…

计算机组成原理之定点除法

文章目录 定点除法运算原码恢复余数法原码不恢复余数法&#xff08;加减交替法&#xff09;运算规则 习题 定点除法运算 注意 &#xff08;1&#xff09;被除数小于除数的时候&#xff0c;商0 &#xff08;2&#xff09;接下来&#xff0c;有一个除数再原来的基础上&#xff0c…

springboot + Vue前后端项目(第十六记)

项目实战第十六记 写在前面1 第一个bug1.1 完整的Role.vue 2 第二个bug2.1 修改路由router下面的index.js 总结写在最后 写在前面 发现bug&#xff0c;修复bug 1 第一个bug 分配菜单时未加入父id&#xff0c;导致分配菜单失效 <!-- :check-strictly"true" 默…

图的应用之最小生成树

大纲 生成树介绍 特点 但n个 种类 最小生成树 应用 构造算法 MST性质 Prim算法 依次选择与顶点相邻的不会构成回路的最小边对应的顶点 Kruskal算法 依次选不会构成环的最小边 区别 Prim算法有n个顶点进行选择&#xff0c;每个顶点有n个选择&#xff0c;复杂度为O(n*n) K…

C51学习归纳13 --- AD/DA转换

AD/DA转换实现了计算机和模拟信号的连接&#xff0c;扩展了计算机的应用场景&#xff0c;为模拟信号数字化提供了底层支持。 AD转换通常是多个输入通道&#xff0c;使用多路选择器连接到AD开关&#xff0c;实现AD多路复用的目的&#xff0c;提高利用率。 AD/DA转换可以使用串口…

我的创作纪念日(1825天)

Ⅰ、机缘 1. 记得是大一、大二的时候就听学校的大牛说&#xff0c;可以通过写 CSDN 博客&#xff0c;来提升自己的代码和逻辑能力&#xff0c;虽然即将到了写作的第六个年头&#xff0c;但感觉这句话依旧受用; 2、今年一整年的创作都没有停止&#xff0c;本年度几乎是每周都来…

UniApp或微信小程序中scroll-view组件使用show-scrollbar在真机Android或IOS中隐藏不了滚动条的解决办法

show-scrollbar 属性 不论是使用 变量 还是直接使用 布尔值或者直接使用 css 都是在 ios、Android 上是都没有效果。。 真机中还是出现滚动条 解决办法 添加下面CSS ::-webkit-scrollbar {display: none;width: 0 !important;height: 0 !important;-webkit-appearance: no…

盛世古董乱世金-数据库稳定到底好不好?

是不是觉得这个还用问&#xff1f; 是的要问。因为这个还是一个有争议的问题。但是争议双方都没有错。这就像辩论&#xff0c;有正反双方。大家都说的有道理&#xff0c;但是很难说谁对谁错。 正方观点&#xff1a;数据库稳定好 其实这个是用户的观点&#xff0c;应用开发人…

17个关键方法指南,保护您的web站点安全!

了解如何让您的web应用程序或网站安全&#xff0c;对于网站所有者来说至关重要。以下是一些关键步骤&#xff0c;可以帮助您保护网站免受攻击和数据泄露。 1.使用公钥加密技术 当数据以明文形式传输时&#xff0c;它容易受到中间人 &#xff08;MitM&#xff09; 攻击。这意味…

北航第六次数据结构与程序设计作业(查找与排序)选填题

一、 顺序查找的平均查找长度ASL&#xff08;1 2 …… n&#xff09;/ n (n 1&#xff09;/ 2 二、 这半查找法的平均查找次数和判定树的深度有关系。若查找一个不存在的元素&#xff0c;说明进行了深度次比较。 注意&#xff0c;判定树不是满二叉树&#xff0c;因此深…

安卓网络通信(多线程、HTTP访问、图片加载、即时通信)

本章介绍App开发常用的以下网络通信技术&#xff0c;主要包括&#xff1a;如何以官方推荐的方式使用多线程技术&#xff0c;如何通过okhttp实现常见的HTTP接口访问操作&#xff0c;如何使用Dlide框架加载网络图片&#xff0c;如何分别运用SocketIO和WebSocket实现及时通信功能等…