【Java万花筒】消息处理大曝光:Java中的队列工具全景解析

消息驱动的世界:探寻Java中队列处理的未知领域

前言

在现代软件开发中,消息队列处理成为了构建高性能、分布式、异步通信系统的核心组件。本文将深入探讨Java中的队列处理库,从高性能并发队列处理的Disruptor到分布式流处理平台Kafka,再到开源消息代理软件RabbitMQ和消息中间件ActiveMQ,为读者展示Java生态系统中丰富的队列处理工具和框架。

欢迎订阅专栏:Java万花筒

文章目录

  • 消息驱动的世界:探寻Java中队列处理的未知领域
    • 前言
      • 1. Disruptor: 高性能并发队列处理
        • 1.1 Disruptor 基础概念
        • 1.2 Disruptor 的使用场景
        • 1.3 Disruptor 的设计原理
        • 1.4 Disruptor 的性能优势
        • 1.5 Disruptor 的高级特性
      • 2. Queue: Java 中的队列接口及实现
        • 2.1 Queue 接口介绍
          • 2.1.1 Queue 接口的常见方法
      • 2.2 队列的实现类
        • 2.2.1 ArrayBlockingQueue
        • 2.2.2 LinkedList
        • 2.2.3 PriorityQueue
      • 3. LMAX Disruptor: 低延迟事件处理的并发编程框架
        • 3.1 LMAX Disruptor 的特点
        • 3.2 LMAX Disruptor 的使用示例
          • 3.2.1 创建 Disruptor 对象
          • 3.2.2 定义事件
          • 3.2.3 定义事件处理器
          • 3.2.4 启动 Disruptor
        • 3.3 LMAX Disruptor 的原理解析
          • 3.3.1 环形缓冲区(Ring Buffer)
          • 3.3.2 事件发布与消费
          • 3.3.3 无锁设计
      • 4. Kafka: 分布式流处理平台
        • 4.1 Kafka 的核心概念
          • 4.1.1 主题(Topics)
          • 4.1.2 分区(Partitions)
          • 4.1.3 生产者(Producers)和消费者(Consumers)
        • 4.2 Kafka 生产者示例
          • 4.2.1 创建 Kafka 生产者
          • 4.2.2 Kafka 消费者示例
          • 4.2.3 运行示例
      • 5. RabbitMQ: 开源消息代理软件
        • 5.1 RabbitMQ 的基本概念
          • 5.1.1 消息队列(Message Queues)
          • 5.1.2 交换机(Exchanges)和队列绑定(Bindings)
        • 5.2 RabbitMQ 的特点和优势
        • 5.3 RabbitMQ 的使用示例
        • 5.4 RabbitMQ 的消息消费示例
        • 5.5 RabbitMQ 的消息确认模式
          • 5.5.1 手动消息确认模式
          • 5.5.2 自动消息确认模式
        • 5.6 RabbitMQ 的消息持久化
          • 5.6.1 持久化消息
          • 5.6.2 持久化队列
      • 6. ActiveMQ: 开源消息中间件
        • 6.1 ActiveMQ 的特点
        • 6.2 ActiveMQ 的基本概念
          • 6.2.1 JMS(Java Message Service)
          • 6.2.2 消息模型
          • 6.2.3 连接工厂(Connection Factory)
        • 6.3 ActiveMQ 的使用场景
        • 6.4 ActiveMQ 的消息发送示例
        • 6.5 ActiveMQ 的消息消费示例
    • 总结

1. Disruptor: 高性能并发队列处理

1.1 Disruptor 基础概念

Disruptor是一种高性能的并发框架,核心概念包括环形缓冲区、序号和事件。

1.2 Disruptor 的使用场景

适用于需要高性能且低延迟的场景,如金融交易系统、网络通信中的数据传输。

1.3 Disruptor 的设计原理

基于无锁的并发编程思想,利用环形缓冲区和序号实现高效的事件处理。

// 示例代码
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;public class DisruptorExample {// 定义事件class Event {// Event data and methods}// 定义事件处理器class EventHandler implements com.lmax.disruptor.EventHandler<Event> {// Event handling logic@Overridepublic void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {// Process the event}}public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();int bufferSize = 1024;// 创建 Disruptor 对象Disruptor<Event> disruptor = new Disruptor<>(Event::new, bufferSize, executor);// 定义事件处理器disruptor.handleEventsWith(new EventHandler());// 启动 Disruptordisruptor.start();}
}
1.4 Disruptor 的性能优势

Disruptor以其卓越的性能而闻名,主要得益于以下几个方面:

  • 无锁设计: Disruptor采用无锁的设计,通过CAS(Compare-And-Swap)等原子操作保证多线程并发的安全性,避免了传统锁带来的性能瓶颈。

  • 环形缓冲区: Disruptor使用环形缓冲区作为存储事件的数据结构,通过预分配一块连续的内存,避免了内存碎片和频繁的垃圾回收,提高了内存访问效率。

  • 预分配内存: Disruptor在启动时就会预分配整个环形缓冲区的内存空间,避免了在运行时动态扩展和分配内存的开销,提高了处理速度。

1.5 Disruptor 的高级特性

Disruptor还提供了一些高级特性,进一步优化性能和灵活性:

  • 事件链: 允许将多个事件处理器按顺序链接形成事件处理链,避免了不必要的数据拷贝,提高了处理效率。

  • 超时等待策略: Disruptor支持定义等待策略,包括阻塞等待、忙等待和超时等待,使得在不同场景下可以灵活应对。

// 示例代码
import com.lmax.disruptor.YieldingWaitStrategy;// 创建 Disruptor 对象并指定等待策略
Disruptor<Event> disruptor = new Disruptor<>(Event::new, bufferSize, executor, new YieldingWaitStrategy());

2. Queue: Java 中的队列接口及实现

2.1 Queue 接口介绍

Queue接口定义了队列操作,包括入队、出队、获取队头元素等。

2.1.1 Queue 接口的常见方法
  • offer(E e): 将元素插入队列,成功返回 true,否则返回 false。
  • poll(): 移除并返回队头元素,若队列为空则返回 null。
  • peek(): 返回队头元素但不移除,若队列为空则返回 null。
// 示例代码
import java.util.LinkedList;
import java.util.Queue;public class QueueExample {public static void main(String[] args) {// 使用LinkedList实现QueueQueue<String> queue = new LinkedList<>();// 入队queue.offer("Element1");queue.offer("Element2");// 出队String element = queue.poll();System.out.println("Dequeued Element: " + element);// 获取队头元素String frontElement = queue.peek();System.out.println("Front Element: " + frontElement);}
}

2.2 队列的实现类

2.2.1 ArrayBlockingQueue

基于数组实现的有界阻塞队列,具有固定大小的容量。

// 示例代码
import java.util.concurrent.ArrayBlockingQueue;public class ArrayBlockingQueueExample {public static void main(String[] args) {int capacity = 5;ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(capacity);// 入队arrayBlockingQueue.offer("Element1");arrayBlockingQueue.offer("Element2");// 出队String element = arrayBlockingQueue.poll();System.out.println("Dequeued Element: " + element);}
}
2.2.2 LinkedList

基于链表实现的非阻塞队列,可作为队列或双端队列使用。

// 示例代码
import java.util.LinkedList;public class LinkedListQueueExample {public static void main(String[] args) {LinkedList<String> linkedListQueue = new LinkedList<>();// 入队linkedListQueue.offer("Element1");linkedListQueue.offer("Element2");// 出队String element = linkedListQueue.poll();System.out.println("Dequeued Element: " + element);}
}
2.2.3 PriorityQueue

基于堆实现的优先级队列,元素按照优先级顺序出队。

// 示例代码
import java.util.PriorityQueue;public class PriorityQueueExample {public static void main(String[] args) {PriorityQueue<String> priorityQueue = new PriorityQueue<>();// 入队priorityQueue.offer("Element2");priorityQueue.offer("Element1");// 出队(按照优先级顺序)String element = priorityQueue.poll();System.out.println("Dequeued Element: " + element);}
}

3. LMAX Disruptor: 低延迟事件处理的并发编程框架

3.1 LMAX Disruptor 的特点

提供极低延迟和高吞吐量,无锁设计、环形缓冲区、预分配内存等。

3.2 LMAX Disruptor 的使用示例
3.2.1 创建 Disruptor 对象
// 示例代码
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;public class DisruptorExample {// 定义事件class Event {// Event data and methods}// 定义事件处理器class EventHandler implements com.lmax.disruptor.EventHandler<Event> {// Event handling logic@Overridepublic void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {// Process the event}}public static void main(String[] args) {Executor executor = Executors.newCachedThreadPool();int bufferSize = 1024;// 创建 Disruptor 对象Disruptor<Event> disruptor = new Disruptor<>(Event::new, bufferSize, executor);// 定义事件处理器disruptor.handleEventsWith(new EventHandler());// 启动 Disruptordisruptor.start();}
}
3.2.2 定义事件
// 示例代码
public class Event {// Event data and methods
}
3.2.3 定义事件处理器
// 示例代码
import com.lmax.disruptor.EventHandler;public class EventHandler implements EventHandler<Event> {// Event handling logic@Overridepublic void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {// Process the event}
}
3.2.4 启动 Disruptor
// 示例代码
disruptor.handleEventsWith(new EventHandler());
disruptor.start();
3.3 LMAX Disruptor 的原理解析

LMAX Disruptor 的高性能和低延迟主要得益于其精心设计的内部原理。以下将对其核心原理进行解析。

3.3.1 环形缓冲区(Ring Buffer)

Disruptor 使用环形缓冲区作为数据存储结构。环形缓冲区是一个固定大小的数组,用于存储事件对象。生产者将事件写入缓冲区的尾部,消费者从缓冲区的头部读取事件。这种设计避免了复杂的内存分配和释放操作,提高了内存访问的效率。

// 示例代码
import com.lmax.disruptor.RingBuffer;public class RingBufferExample {public static void main(String[] args) {int bufferSize = 1024;RingBuffer<Long> ringBuffer = RingBuffer.createSingleProducer(Long::new, bufferSize);// 生产者向环形缓冲区写入数据for (long i = 0; i < bufferSize; i++) {long sequence = ringBuffer.next();ringBuffer.get(sequence).set(i);ringBuffer.publish(sequence);}// 消费者从环形缓冲区读取数据ringBuffer.forEach(System.out::println);}
}
3.3.2 事件发布与消费

Disruptor 使用事件发布者(Producer)和事件消费者(Consumer)模式来处理事件。生产者将事件写入环形缓冲区,然后通知消费者进行处理。消费者从环形缓冲区读取事件,并进行相应的处理逻辑。这种生产者-消费者模式实现了高效的并发通信。

// 示例代码
import com.lmax.disruptor.EventHandler;public class EventProcessor implements EventHandler<Event> {@Overridepublic void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {// Process the event}
}
3.3.3 无锁设计

为了实现高性能和低延迟,Disruptor 使用了无锁设计。在环形缓冲区中,生产者和消费者之间通过序号进行通信,而不需要加锁。这种无锁设计避免了线程之间的竞争和阻塞,提高了系统的并发性能。

// 示例代码
public class RingBuffer {// Implementation of RingBuffer with no locks
}

以上是 LMAX Disruptor 的核心原理,它的设计思想和实现方式为高性能、低延迟的并发编程提供了一种新的解决方案。

4. Kafka: 分布式流处理平台

4.1 Kafka 的核心概念
4.1.1 主题(Topics)

消息的逻辑容器,生产者发送消息,消费者订阅消息。

4.1.2 分区(Partitions)

主题可以分为多个分区,提高消息的并发处理能力。

4.1.3 生产者(Producers)和消费者(Consumers)

生产者发送消息,消费者从主题订阅消息进行处理。

4.2 Kafka 生产者示例
4.2.1 创建 Kafka 生产者
// 示例代码
import org.apache.kafka.clients.producer.*;import java.util.Properties;public class KafkaProducerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);String topic = "test-topic";String message = "Hello, Kafka!";// 发送消息producer.send(new ProducerRecord<>(topic, message), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {exception.printStackTrace();} else {System.out.println("Message sent successfully, Offset: " + metadata.offset());}}});producer.close();}
}
4.2.2 Kafka 消费者示例
// 示例代码
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerExample {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);String topic = "test-topic";// 订阅主题consumer.subscribe(Collections.singletonList(topic));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}
4.2.3 运行示例
  • 首先,确保 Kafka 服务已经启动并监听在默认端口 9092 上。
  • 分别运行 Kafka 生产者示例和 Kafka 消费者示例。
  • Kafka 生产者将消息发送到名为 “test-topic” 的主题。
  • Kafka 消费者从 “test-topic” 主题订阅消息,并在控制台输出接收到的消息。

以上是 Kafka 生产者和消费者的示例代码,展示了如何使用 Kafka Java 客户端库进行消息的发送和接收。

5. RabbitMQ: 开源消息代理软件

5.1 RabbitMQ 的基本概念
5.1.1 消息队列(Message Queues)

通过消息队列实现消息的存储和传递,生产者发送消息,消费者获取消息。

5.1.2 交换机(Exchanges)和队列绑定(Bindings)

交换机将消息发送到队列,通过绑定将交换机和队列关联。

5.2 RabbitMQ 的特点和优势

灵活的消息路由和多种消息传递模式,支持消息的可靠传递和确认机制。

5.3 RabbitMQ 的使用示例
// 示例代码
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class RabbitMQExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 定义队列String queueName = "myQueue";channel.queueDeclare(queueName, false, false, false, null);// 发送消息String message = "Hello, RabbitMQ!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
5.4 RabbitMQ 的消息消费示例

使用RabbitMQ,我们可以轻松地编写消费者来获取队列中的消息。下面是一个简单的消费者示例:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumerExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义队列String queueName = "myQueue";channel.queueDeclare(queueName, false, false, false, null);// 定义消息接收回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

这个示例代码创建了一个RabbitMQ连接工厂,并与本地主机建立连接。然后,它定义了一个名为"myQueue"的队列。接下来,我们定义了一个消息接收回调函数(DeliverCallback),它在收到消息时被调用,并打印出接收到的消息。最后,我们通过调用basicConsume方法来启动消息的消费过程。

5.5 RabbitMQ 的消息确认模式

RabbitMQ提供了消息确认(Message Acknowledgment)机制,以确保消息能够可靠地被消费。消费者在接收到消息后,可以发送确认消息给RabbitMQ,告知它已经成功接收和处理了消息。如果消费者未发送确认消息,RabbitMQ会认为消息未被正确处理,然后将其重新发送给其他消费者。

5.5.1 手动消息确认模式

下面是一个使用手动消息确认模式的消费者示例:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQManualAckExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义队列String queueName = "myQueue";channel.queueDeclare(queueName, false, false, false, null);// 开启手动消息确认模式channel.basicQos(1);// 定义消息接收回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 消费消息channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });}
}

在这个示例代码中,我们使用basicQos方法将信道设置为一次只处理一条消息。然后,我们通过调用basicAck方法手动确认消息。这样,只有在消费者成功处理完消息后,才会发送确认消息给RabbitMQ。

5.5.2 自动消息确认模式

除了手动消息确认模式,RabbitMQ还提供了自动消息确认模式(Auto Acknowledgment)。在这种模式下,消费者不需要发送确认消息,RabbitMQ会自动确认消息的接收和处理。

下面是一个使用自动消息确认模式的消费者示例:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQAutoAckExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义队列String queueName = "myQueue";channel.queueDeclare(queueName, false, false, false, null);// 定义消息接收回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

在这个示例代码中,我们使用basicConsume方法启动了消费者,并将第二个参数设置为true,以启用自动消息确认模式。这样,RabbitMQ会在消费者成功接收和处理消息后自动确认消息。

5.6 RabbitMQ 的消息持久化

默认情况下,RabbitMQ中的消息是非持久化的,如果消息代理软件崩溃或重启,所有未被消费的消息将丢失。为了确保消息的持久化,我们需要将消息和队列都标记为持久化。

5.6.1 持久化消息

下面是一个将消息标记为持久化的生产者示例:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class RabbitMQPersistentMessageExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 定义队列String queueName = "myQueue";boolean durable = true; // 将队列标记为持久化channel.queueDeclare(queueName, durable, false, false, null);// 发送持久化消息String message = "Hello, RabbitMQ!";channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}

在这个示例代码中,我们使用queueDeclare方法将队列标记为持久化。然后,通过在basicPublish方法中使用MessageProperties.PERSISTENT_TEXT_PLAIN参数,将消息标记为持久化。

5.6.2 持久化队列

下面是一个将队列标记为持久化的消费者示例:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQPersistentQueueExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 定义队列String queueName = "myQueue";boolean durable = true; // 将队列标记为持久化channel.queueDeclare(queueName, durable, false, false, null);// 定义消息接收回调函数DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 消费消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

在这个示例代码中,我们使用queueDeclare方法将队列标记为持久化。这样,即使RabbitMQ代理软件崩溃或重启,队列和其中的消息也会被保留下来。

6. ActiveMQ: 开源消息中间件

6.1 ActiveMQ 的特点

ActiveMQ是一个开源的消息中间件,具有以下特点:

  • 可靠性: 提供可靠的消息传递机制,支持持久化消息,确保消息不会因系统故障而丢失。

  • 灵活性: 支持多种消息传递模型,包括点对点和发布-订阅模型,适应不同场景的需求。

  • 跨平台性: 提供了多种语言的客户端,支持跨平台的消息通信。

6.2 ActiveMQ 的基本概念
6.2.1 JMS(Java Message Service)

JMS是Java平台中定义的一种API,用于通过消息中间件进行异步消息通信。ActiveMQ完全支持JMS,提供了JMS API的实现。

6.2.2 消息模型

ActiveMQ支持两种主要的消息模型:

  • 点对点(P2P): 每个消息只有一个消费者可以接收,类似于队列的模式。

  • 发布-订阅(Pub-Sub): 消息被发送到主题(Topic),所有订阅该主题的消费者都可以接收到消息。

6.2.3 连接工厂(Connection Factory)

连接工厂用于创建与消息中间件的连接,它是JMS的一部分。在ActiveMQ中,连接工厂负责创建连接、会话等对象。

// 示例代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;public class ActiveMQExample {public static void main(String[] args) throws JMSException {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接try (Connection connection = connectionFactory.createConnection()) {// 连接其他ActiveMQ的相关逻辑}}
}
6.3 ActiveMQ 的使用场景

ActiveMQ适用于各种异步消息通信场景,例如:

  • 企业应用集成(EAI): 在企业内部的不同应用系统之间进行消息传递。

  • 分布式系统通信: 用于分布式系统中不同节点之间的消息通信。

  • 异步处理: 支持将任务异步处理,提高系统的响应速度。

6.4 ActiveMQ 的消息发送示例

使用ActiveMQ,我们可以编写生产者来发送消息到消息队列。下面是一个简单的生产者示例:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;public class ActiveMQProducerExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接try (Connection connection = connectionFactory.createConnection()) {// 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列Destination destination = session.createQueue("myQueue");// 创建生产者MessageProducer producer = session.createProducer(destination);// 创建消息TextMessage message = session.createTextMessage("Hello, ActiveMQ!");// 发送消息producer.send(message);System.out.println("Message sent successfully.");}}
}

这个示例代码创建了一个ActiveMQ连接工厂,并与本地主机建立连接。然后,它创建了一个会话和一个队列。接下来,我们创建了一个生产者,并创建了一条文本消息。最后,我们通过调用send方法将消息发送到队列中。

6.5 ActiveMQ 的消息消费示例

使用ActiveMQ,我们可以编写消费者来获取队列中的消息。下面是一个简单的消费者示例:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;public class ActiveMQConsumerExample {public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接try (Connection connection = connectionFactory.createConnection()) {// 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 创建队列Destination destination = session.createQueue("myQueue");// 创建消费者MessageConsumer consumer = session.createConsumer(destination);// 接收消息TextMessage message = (TextMessage) consumer.receive();System.out.println("Received message: " + message.getText());}}
}

这个示例代码创建了一个ActiveMQ连接工厂,并与本地主机建立连接。然后,它创建了一个会话和一个队列。接下来,我们创建了一个消费者。最后,我们通过调用receive方法来接收队列中的消息,并打印出消息的内容。

总结

本文介绍了Java中队列处理的相关库和框架,涵盖了高性能并发队列处理的Disruptor库、Java中的队列接口及实现、分布式流处理平台Kafka、开源消息代理软件RabbitMQ以及开源消息中间件ActiveMQ。

  • Disruptor: Disruptor是一个高性能并发框架,通过环形缓冲区、序号等概念实现无锁的事件处理。它适用于需要极低延迟和高吞吐量的场景。

  • Queue接口及实现: Java中的Queue接口提供了队列操作的标准定义,而ArrayBlockingQueue、LinkedList和PriorityQueue等实现类提供了不同特性的队列实现。

  • Kafka: Kafka是一个分布式流处理平台,具有高吞吐量、持久性、分布式等特点,适用于构建实时数据管道,支持多主题、分区和生产者-消费者模型。

  • RabbitMQ: RabbitMQ是一个开源消息代理软件,支持多种消息传递模式,包括点对点和发布-订阅模型,提供灵活的消息路由和确认机制。

  • ActiveMQ: ActiveMQ是一个开源的消息中间件,支持JMS API,具有可靠性、灵活性和跨平台性。它适用于企业应用集成、分布式系统通信和异步处理等场景。

这些库和框架提供了丰富的工具和模型,可根据具体业务需求选择合适的队列处理方式,以满足系统性能、可靠性和扩展性的要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/679038.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nginx中常用监控模块讲解

Nginx中常用监控模块讲解 Nginx是一个高性能的HTTP和反向代理服务器&#xff0c;它支持多种监控模块&#xff0c;可以帮助我们更好地了解和优化服务器的性能。本文将介绍Nginx中的一些常用监控模块&#xff0c;并提供相应的代码示例。 访问日志模块 访问日志模块用于记录客户…

Stable Diffusion 模型下载:DreamShaper(梦想塑造者)

文章目录 模型介绍生成案例案例一案例二案例三案例四案例五案例六案例七案例八案例九案例十 下载地址 模型介绍 DreamShaper 是一个分格多样的大模型&#xff0c;可以生成写实、原画、2.5D 等多种图片&#xff0c;能生成很棒的人像和风景图。 条目内容类型大模型基础模型SD 1…

Elasticsearch:使用查询规则(query rules)进行搜索

在之前的文章 “Elasticsearch 8.10 中引入查询规则 - query rules”&#xff0c;我们详述了如何使用 query rules 来进行搜索。这个交互式笔记本将向你介绍如何使用官方 Elasticsearch Python 客户端来使用查询规则。 你将使用 query rules API 将查询规则存储在 Elasticsearc…

HTTP网络通信协议基础

目录 前言&#xff1a; 1.HTTP协议理论 1.1协议概念 1.2工作原理 2.HTTP抓包工具 2.1Fiddler工具 2.2抓包原理 3.HTTP协议格式 3.1HTTP请求 3.2HTTP响应 3.3格式总结 前言&#xff1a; 在了解完网络编程的传输层UDP和TCP通信协议后&#xff0c;就需要开始对数据进行…

mac卸载被锁定的app

sudo chflags -hv noschg /Applications/YunShu.app 参考&#xff1a;卸载云枢&#xff08;MacOS 版&#xff09;

13. 串口接收模块的项目应用案例

1. 使用串口来控制LED灯工作状态 使用串口发送指令到FPGA开发板&#xff0c;来控制第7课中第4个实验的开发板上的LED灯的工作状态。 LED灯的工作状态&#xff1a;让LED灯按指定的亮灭模式亮灭&#xff0c;亮灭模式未知&#xff0c;由用户指定&#xff0c;8个变化状态为一个循…

【漏洞复现】狮子鱼CMS文件上传漏洞(image_upload.php)

Nx01 产品简介 狮子鱼CMS&#xff08;Content Management System&#xff09;是一种网站管理系统&#xff0c;它旨在帮助用户更轻松地创建和管理网站。该系统拥有用户友好的界面和丰富的功能&#xff0c;包括页面管理、博客、新闻、产品展示等。通过简单直观的管理界面&#xf…

骑砍战团MOD开发(44)-可编程渲染管线shader编程

一.可编程渲染管线 在GPU进行3D模型投射到2D平面过程中,渲染管线算法对开发者开放,目前支持的编程语言有OpenGL的ARB语言(pp文件),Direct3D的HLSL高级shader编程语言(fx文件). Direct3D提供一下API实现程序加载shader着色器文件: D3DXCreateEffectFromFile(gDevice,"fxfn…

Linux nohup命令和

参考资料 linux后台运行nohup命令的使用及2>&1字符详解 目录 前期准备一. 基本语法二. 执行时不指定日志文件三. 执行后不想要日志文件四. nohup命令的执行与kill4.1 执行4.2 kill 前期准备 &#x1f4c4;handle_file.sh #!/bin/bashecho "文件复制开始..."…

Nginx报错合集(502 Bad Gateway,504 Gateway nginx/1.18.0 (Ubuntu) 等等报错)

1.504 Gateway Time-outnginx/1.18.0 (Ubuntu) 日志报错&#xff1a; 2024/02/11 04:38:54 [error] 564#564: *29 upstream timed out (110: Connection timed out) while reading response header from upstream, client: *******, server: *******, request: "GE…

从REPR设计模式看 .NET的新生代类库FastEndpoints的威力

📢欢迎点赞 :👍 收藏 ⭐留言 📝 如有错误敬请指正,赐人玫瑰,手留余香!📢本文作者:由webmote 原创📢作者格言:新的征程,我们面对的不仅仅是技术还有人心,人心不可测,海水不可量,唯有技术,才是深沉黑夜中的一座闪烁的灯塔 !序言 又到了一年年末,春节将至…

每日OJ题_位运算⑥_力扣137. 只出现一次的数字 II

目录 力扣137. 只出现一次的数字 II 解析代码 力扣137. 只出现一次的数字 II 137. 只出现一次的数字 II 难度 中等 给你一个整数数组 nums &#xff0c;除某个元素仅出现 一次 外&#xff0c;其余每个元素都恰出现 三次 。请你找出并返回那个只出现了一次的元素。 你必须…

WebSocketServer+redis实时更新页面数据

redis 实现发布订阅功能具体实现_redis convertandsend-CSDN博客 主要看上面这个 使用redis做websocket分布式消息推送服务_websocket redis-CSDN博客 ClassCastException: java.lang.String cannot be cast to com.alibaba.fastjson.JSONObject 的解决办法_java.lang.class…

锐捷(十九)锐捷设备的接入安全

1、PC1的IP地址和mac地址做全局静态ARP绑定; 全局下&#xff1a;address-bind 192.168.1.1 mac&#xff08;pc1&#xff09; G0/2:ip verify source port-securityarp-check 2、PC2的IP地址和MAC地址做全局IPMAC绑定&#xff1a; Address-bind 192.168.1.2 0050.7966.6807Ad…

Mysql中索引优化和失效

什么是索引 要了解索引优化和索引失效的场景就要先了解什么是索引 索引是一种有序的存储结构&#xff0c;按照单个或者多个列的值进行排序&#xff0c;以提升搜索效率。 索引的类型 UNIQUE唯一索引 不可以出现相同的值&#xff0c;可以有NULL值。 INDEX普通索引 允许出现相同…

QT+OSG/osgEarth编译之八十二:osgdb_obj+Qt编译(一套代码、一套框架,跨平台编译,版本:OSG-3.6.5插件库osgdb_obj)

文章目录 一、osgdb_obj介绍二、文件分析三、pro文件四、编译实践一、osgdb_obj介绍 OBJ格式是一种标准的3D模型文件格式,它以纯文本形式存储关于3D模型的信息。这种格式最初由Wavefront Technologies为其高级可视化系统开发,后来被广泛应用于3D软件之间的数据交换。OBJ格式…

sheng的学习笔记-网络爬虫scrapy框架

基础知识&#xff1a; scrapy介绍 何为框架&#xff0c;就相当于一个封装了很多功能的结构体&#xff0c;它帮我们把主要的结构给搭建好了&#xff0c;我们只需往骨架里添加内容就行。scrapy框架是一个为了爬取网站数据&#xff0c;提取数据的框架&#xff0c;我们熟知爬虫总…

深入理解设计模式:建造者模式

深入理解设计模式&#xff1a;建造者模式 在软件工程中&#xff0c;设计模式是解决常见问题的模板或指南。它们不是可以插入应用程序并期待奇迹发生的即成代码&#xff0c;而是在特定上下文中解决特定问题的指导原则。今天&#xff0c;我们将深入探讨其中一个创建型模式——建…

神经语言程式(NLP)项目的15 个开源训练数据集

一个聊天机器人需要大量的训练数据,以便在无需人工干预的情况下快速解决用户的询问。然而,聊天机器人开发的主要瓶颈是获取现实的、面向任务的对话数据来训练这些基于机器学习的系统。 我们整理了训练聊天机器人所需的对话数据集,包括问答数据、客户支持数据、对话数据和多…

React Native开发iOS实战录

文章目录 背景环境准备主要工具xcode安装安装CocoaPods 基本步骤常见问题ruby3在macOS上编译失败import of module ‘glog.glog.log_severity’ appears within namespace ‘google’yarn网络问题pod安装失败unable to open settings file 相关链接 背景 准备将之前的一个Reac…