【分布式计算】java消息队列机制

        消息队列是一种在不同组件或应用之间进行数据传递的技术,通常用于处理异步通信。它允许消息的发送者(生产者)和接收者(消费者)之间进行解耦。


概念


        消息队列是一种先进先出(FIFO)的数据结构,它存储待处理的消息直到它们被消费。消息是生产者发送给队列的数据单元,消费者则从队列中读取这些消息进行处理。


原理


1. 生产者:
   - 生产者是创建消息的实体,它负责将消息发送到队列。生产者不需要关心消息的具体处理过程,只需确保消息正确发送到队列。

2. 消息队列:
   - 消息队列充当缓冲区,暂时存储从生产者那里发送过来的消息。队列管理消息的顺序,并确保按照发送的顺序逐一传递给消费者。

3. 消费者:
   - 消费者从消息队列中读取消息,并进行相应的处理。消费者可以是同一应用的其他部分,或者是完全独立的应用。

4. 消息处理:
   - 一旦消息被消费者读取,它可以被确认和删除,或者在处理失败时重新放回队列等待再次处理。


使用场景


异步处理:当应用执行耗时任务时,可以将任务封装成消息发送到队列,由消费者异步处理。
流量控制:在高流量事件如大促销或黑色星期五时,消息队列可以帮助缓冲入站流量,防止系统过载。
解耦服务:在微服务架构中,消息队列可以帮助减少服务之间的直接依赖,通过消息传递来通信,从而提高系统的可维护性和扩展性。


Java消息队列技术

在Java中,消息队列是一种数据结构或服务,用于在不同的应用组件或系统之间异步传递消息。它支持松耦合的架构,允许发送者和接收者独立地进行开发和扩展。消息队列可以帮助缓解高负载、增强系统的可伸缩性,并提供容错机制。下面是一些常见的Java消息队列技术:

1. Apache Kafka:
        Kafka是一个分布式流处理平台,它不仅能够处理消息队列的功能,还能处理复杂的事件流。它特别适合需要高吞吐量和可靠性的大规模数据处理场景。

2. RabbitMQ:
        RabbitMQ是一个开源消息代理,支持多种消息协议。它提供灵活的路由功能,能够保证消息的可靠传输。适合于复杂的消息传递需求和多种不同的通信模式。

3. ActiveMQ:
        Apache ActiveMQ是一个强大的开源消息代理,支持多种JMS(Java Message Service)协议和客户端语言。适用于那些需要JMS标准支持的企业应用。

4. Amazon SQS (Simple Queue Service):
        SQS是一个托管的消息队列服务,提供简单的Web服务API来完全管理队列的消息传输。它能够无限扩展,并且不需要预先安装消息队列基础设施。

5. Google Cloud Pub/Sub:
        Google的Pub/Sub提供了一种全球分布式的消息传递平台,适合处理大量数据的实时交换。


这个流程图展示了使用 ActiveMQ 实现消息队列的基本步骤,包括消息的发送和接收。以下是每个步骤的详细讲解。

 1. 创建 ConnectionFactory

`ConnectionFactory` 是一个接口,用于创建连接到消息中间件(ActiveMQ)的工厂。它是创建连接的起点。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");


2. 使用 ConnectionFactory 创建 Connection

通过 `ConnectionFactory` 创建一个连接对象 `Connection`。

Connection connection = connectionFactory.createConnection();

3. 启动 Connection

在使用连接之前,必须启动它。

connection.start();

4. 使用 Connection 创建一个或多个 JMS Session

通过 `Connection` 创建会话 `Session`。会话是生产和消费消息的上下文。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


5. 使用 Session 创建 Queue 或 Topic

通过会话创建队列(Queue)或主题(Topic)。队列用于点对点消息传递,主题用于发布/订阅消息传递。

Queue queue = session.createQueue("testQueue");
// 或者
Topic topic = session.createTopic("testTopic");

6. 使用 Session 创建 MessageProducer 或 MessageConsumer

根据需要创建消息生产者 `MessageProducer` 或消息消费者 `MessageConsumer`。

创建 MessageProducer

MessageProducer producer = session.createProducer(queue);

创建 MessageConsumer

MessageConsumer consumer = session.createConsumer(queue);

7. 发送消息

使用 `MessageProducer` 发送消息。

TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
producer.send(message);

8. 接收消息

异步接收

设置消息监听器,当有消息到达时自动触发。

consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);} catch (JMSException e) {e.printStackTrace();}}}
});

同步接收

使用 `MessageConsumer.receive()` 方法同步接收消息。

Message message = consumer.receive();
if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);
}

完整代码示例

生产者代码

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Producer {public static void main(String[] args) throws JMSException {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接Connection connection = connectionFactory.createConnection();connection.start();// 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列Queue queue = session.createQueue("testQueue");// 创建生产者MessageProducer producer = session.createProducer(queue);// 创建消息TextMessage message = session.createTextMessage("Hello, ActiveMQ!");// 发送消息producer.send(message);// 关闭连接connection.close();}
}

消费者代码

import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class Consumer {public static void main(String[] args) throws JMSException {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接Connection connection = connectionFactory.createConnection();connection.start();// 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列Queue queue = session.createQueue("testQueue");// 创建消费者MessageConsumer consumer = session.createConsumer(queue);// 同步接收消息Message message = consumer.receive();if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);}// 异步接收消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {String text = ((TextMessage) message).getText();System.out.println("Received: " + text);} catch (JMSException e) {e.printStackTrace();}}}});// 为了测试异步接收,保持程序运行一段时间try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}// 关闭连接connection.close();}
}

具体应用

调用百度云api

使用消息队列实现一个调用百度智能云 API 的校园卡程序有助于提高系统的可扩展性和可靠性。消息队列可以解耦生产者和消费者,并实现异步处理。

实现步骤

1. 配置消息队列:
    - 安装并配置 RabbitMQ 或 ActiveMQ。
    - 配置 Spring Boot 项目以连接到消息队列。

2. 创建生产者(Producer):
    - 接收用户上传的图片。
    - 将图片编码为 Base64 格式,并发送到消息队列。

3. 创建消费者(Consumer):
    - 监听消息队列中的消息。
    - 调用百度智能云 API 进行图片识别。
    - 将识别结果存储以便后续查询。

4. 实现控制器(Controller):
    - 提供上传图片的接口。
    - 提供获取识别结果的接口。

系统架构图

+-------------------+        +--------------------+        +-------------------+
|                   |        |                    |        |                   |
|   User Uploads    |        |   Message Queue    |        |    API Consumer   |
|   (Controller)    | -----> |  (RabbitMQ/ActiveMQ)| -----> | (Baidu API Call)  |
|                   |        |                    |        |                   |
+-------------------+        +--------------------+        +-------------------+

具体实现

  • 用户上传图片:用户通过前端页面上传图片,图片通过 RecognitionController 接收并保存到消息队列。
  • 消息队列:图片以消息的形式存储在消息队列中,保证消息的可靠传递。
  • 消息处理RecognitionListener 监听消息队列,当有新消息到达时,调用百度智能云 API 进行图片识别,并将结果保存到 RecognitionService
  • 获取结果:用户可以通过访问 /api/resultPage 来获取最新的识别结果。

代码部分:

1. 配置消息队列:

   配置 `application.properties` 以连接到消息队列(以 ActiveMQ 为例):

   spring.activemq.broker-url=tcp://localhost:61616spring.activemq.user=adminspring.activemq.password=adminspring.jms.pool.enabled=true

2. 创建生产者(Producer):

  package com.example.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Service;@Servicepublic class RecognitionService {@Autowiredprivate JmsTemplate jmsTemplate;public void sendImageForRecognition(byte[] imageBytes) {jmsTemplate.convertAndSend("animal.recognition.queue", imageBytes);}// 其他方法...}

3. 创建消费者(Consumer):
 

  package com.example.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;@Componentpublic class RecognitionListener {@Autowiredprivate RecognitionService recognitionService;@JmsListener(destination = "animal.recognition.queue")public void processImage(byte[] imageBytes) {String result = recognitionService.recognizeImage(imageBytes);recognitionService.saveResult(result);}}

4. 实现控制器(Controller):

 package com.example.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.ui.Model;import org.springframework.web.bind.annotation.*;import org.springframework.web.multipart.MultipartFile;import org.springframework.web.servlet.view.RedirectView;import java.io.IOException;@Controller@RequestMapping("/api")public class RecognitionController {@Autowiredprivate RecognitionService recognitionService;@PostMapping("/recognize")public RedirectView recognizeAnimal(@RequestParam("file") MultipartFile file, Model model) throws IOException {if (file.isEmpty()) {model.addAttribute("message", "File is empty");return new RedirectView("/errorPage.html");}byte[] bytes = file.getBytes();recognitionService.saveResult("等待识别结果...");recognitionService.sendImageForRecognition(bytes);return new RedirectView("/resultPage.html");}@GetMapping("/resultPage")@ResponseBodypublic String getResult() {return recognitionService.getResult();}}

消息队列的意义

异步处理:允许用户在上传图片后立即获得响应,而不是等待图片识别结果。
解耦:上传图片的部分和图片识别的部分可以独立开发和扩展。
负载均衡:可以轻松增加更多的消费者以处理高并发请求。
可靠性:消息队列持久化消息,确保即使在系统故障时也不会丢失消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/28440.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

中介子方程二十

X$XFX$XEXyXαXiX$XαXiXrXkXtXyX$XpXVX$XdXuXWXπX$XWXyXWX$XπXWXuXdX$XVXpX$XyXtXkXrXiXαX$XiXαXyXEX$XFX$XEXyXαXiX$XαXiXrXkXtXyX$XpXVX$XdXuXWXπX$XWXyXWX$XπXWXuXdX$XVXpX$XyXtXkXrXiXαX$XiXαXyXEX$XαXηXtXαX$XWXyX$XyXWX$XpXαXqXηX$XeXαXhX$XdX$XpX$XdX$…

Web前端开发12章:深入探索与实战解析

Web前端开发12章:深入探索与实战解析 在数字化浪潮的推动下,Web前端开发技术日新月异,成为了构建互联网应用的重要基石。本文将以12章的篇幅,从四个方面、五个方面、六个方面和七个方面,深入探索Web前端开发的精髓&am…

【INTEL(ALTERA)】Nios® II无法使用基于 Ubuntu 18.04.5 的 WSL 进行构建

现象 在使用 Ubuntu 18.04.5 构建 WSL 的Nios II处理器时,任何英特尔 Quartus Prime 软件版本都可能会看到此问题。 原因 这是因为在 Nios II Command Shell 中运行命令 “wslpath -u .”时返回值不同。 正常工作:命令返回”。故障:命令返回…

机器学习(V)--无监督学习(一)聚类

根据训练样本中是否包含标签信息,机器学习可以分为监督学习和无监督学习。聚类算法是典型的无监督学习,目的是想将那些相似的样本尽可能聚在一起,不相似的样本尽可能分开。 相似度或距离 聚类的核心概念是相似度(similarity)或距离(distance…

PyTorch 拼接与拆分-Tensor基本操作

拼接: cat, stack … 使用 cat 在指定维度 dim 上拼接: torch.cat(element_list, dim) >>> a torch.rand(2,3) >>> b torch.rand(1,3) >>> c torch.cat([a,b], dim0) >>> c.shape torch.Size([3, 3])使用 stack 在新增维…

嵌入式学习记录6.14(练习)

#include "mainwindow.h" #include "ui_mainwindow.h"MainWindow::MainWindow(QWidget *parent): QMainWindow(parent), ui(new Ui::MainWindow) {ui->setupUi(this);this->resize(1028,783); //设置左侧背景QLabel *lab1new QLabel(this);lab1->…

uniapp使用vue3打包H5,android或ios加载白屏

前景介绍 按照uniapp官方文档介绍,根据步骤创建了使用Vue3的项目;执行命令npm run build:h5, 本地安装了http-server,打包之后的dist文件夹,执行http-server后,可以访问; 但是使用Android或者ios进行本地加…

【内存管理之堆内存】

1.栈上的基元 2.栈上的聚合对象 3.手动分配和释放 4.分配堆内存 5.数组内存分配和释放 6.数组内存分配 7.不要使用野指针 8.黑暗时代

STM32理论 —— μCOS-Ⅲ(2/2):时间管理、消息队列、信号量、任务内嵌信号量/队列、事件标志、软件定时器、内存管理

文章目录 9. 时间管理9.1 OSTimeDly()9.2 OSTimeDlyHMSM()9.3 OSTimeDlyResume()9.4 延时函数实验 10. 消息队列10.1 创建消息队列函数OSQCreate()10.2 发送消息到消息队列函数(写入队列)OSQPost()10.3 获取消息队列中的消息函数(读出队列)OSQPend()10.4 消息队列操作实验 11. …

12 款 Android 照片恢复应用程序列表

丢失难忘的照片总是令人痛苦的。如果软件崩溃或意外删除,Android 设备上的照片也可能会丢失。这时照片恢复应用程序就派上用场了。查看我们为 Android 收集的顶级照片恢复应用程序。 但是,您不会想为自己选择任何照片恢复应用程序。因此,我们…

解决小程序的异步请求问题

解决小程序的异步请求问题,可以从多个方面入手,以确保请求的顺畅执行和错误处理。以下是一些主要的解决方法和策略: 1. 确保网络连接正常 检查网络连接:首先,确保用户的设备已连接到互联网,并且网络连接稳…

Doris:冷热分层

目录 一、冷热分层介绍 二、存储策略(Storage policy) 2.1 创建存储资源 2.2 创建存储策略 2.3 使用存储策略 三、使用限制 一、冷热分层介绍 冷热分层支持所有 Doris 功能,只是把部分数据放到对象存储上,以节省成本&am…

openGauss 6.0.0 一主二备集群安装及使用zcbus实现Oracle到openGauss的数据同步

一、前言 openGauss 6.0.0-RC1是openGauss 2024年3月发布的创新版本,该版本生命周期为0.5年。根据openGauss官网介绍,6.0.0-RC1与之前的版本特性功能保持兼容,另外,在和之前版本兼容的基础上增加了很多新功能,比如分区表性能优化…

go的netpoll学习

go的运行时调度框架简介 Go的运行时(runtime)中,由调度器管理:goroutine(G)、操作系统线程(M)和逻辑处理器(P)之间的关系 以实现高效的并发执行 当一个gorout…

统计完全子字符串

很不错的计数问题&#xff0c;用到了分组循环技巧和滑动窗口 代码的实现方式也非常值得多看 class Solution { public:int f(string s,int k){int res 0;for(int m1;m<26&&k*m<s.size();m){int cnt[27]{};auto check[&](){for(int i0;i<26;i){if(cnt[i]…

跟着刘二大人学pytorch(第---10---节课之卷积神经网络)

文章目录 0 前言0.1 课程链接&#xff1a;0.2 课件下载地址&#xff1a; 回忆卷积卷积过程&#xff08;以输入为单通道、1个卷积核为例&#xff09;卷积过程&#xff08;以输入为3通道、1个卷积核为例&#xff09;卷积过程&#xff08;以输入为N通道、1个卷积核为例&#xff09…

数据结构学习笔记-图

1.图的存储 &#xff08;1&#xff09;邻接矩阵法 #define MaxVertexNum 100 //顶点数目的最大值 typedef struct{char Vex[MaxVertexNum]; //顶点表int Edge[MaxVertexNum][MaxVertexNum]; //邻接矩阵表&#xff0c;边表int vexnum,arcnum; //图的当前顶点数和边…

计算机组成原理之定点除法

文章目录 定点除法运算原码恢复余数法原码不恢复余数法&#xff08;加减交替法&#xff09;运算规则 习题 定点除法运算 注意 &#xff08;1&#xff09;被除数小于除数的时候&#xff0c;商0 &#xff08;2&#xff09;接下来&#xff0c;有一个除数再原来的基础上&#xff0c…

springboot + Vue前后端项目(第十六记)

项目实战第十六记 写在前面1 第一个bug1.1 完整的Role.vue 2 第二个bug2.1 修改路由router下面的index.js 总结写在最后 写在前面 发现bug&#xff0c;修复bug 1 第一个bug 分配菜单时未加入父id&#xff0c;导致分配菜单失效 <!-- :check-strictly"true" 默…

什么是期权(Options)?以实际例子理解看涨期权(Call)和看跌期权(Put)

什么是期权&#xff1f; 中文版 期权的详细介绍 期权&#xff08;Option&#xff09;是一种金融衍生工具&#xff0c;它赋予持有人在特定时间以预定价格买入或卖出标的资产&#xff08;如股票、债券、商品等&#xff09;的权利&#xff0c;但不负有必须买入或卖出的义务。期…