ConnectionFactory
ConnectionFactory类是RabbitMQ Java客户端库中的一个类,用于创建RabbitMQ连接。常用属性和方法如下:
属性:
- host:RabbitMQ服务器的主机名,默认为localhost。
- port:RabbitMQ服务器的端口号,默认为5672。
- username:连接RabbitMQ服务器的用户名,默认为guest。
- password:连接RabbitMQ服务器的密码,默认为guest。
- virtualHost:连接RabbitMQ服务器的虚拟主机,默认为/。
- connectionTimeout:连接超时时间,默认为0(无限制)。
- requestedHeartbeat:请求的心跳超时时间,默认为0(无限制)。
方法:
- newConnection():创建一个新的RabbitMQ连接。
- createChannel():创建一个新的通道。
Channel
Channel类的常用方法包括:
1. basicPublish:用于将消息发送到指定的交换机和路由键。参数含义:- exchange:消息发送到的交换机名称- routingKey:消息发送到的队列名称- props:消息的属性- body:消息体作用:将消息发送到指定的交换机上,等待被消费者消费。2. basicConsume:用于从指定队列中消费消息。参数含义:- queue:要消费的队列名称- autoAck:是否自动确认消息- callback:消费者接收到消息后的回调函数作用:订阅队列中的消息,等待被消费者消费。3. basicAck:用于确认已经处理完毕的消息。参数含义:- deliveryTag:消息标签- multiple:是否批量确认作用:确认消息已被消费,告诉RabbitMQ可以删除该消息。4. basicNack:用于拒绝处理某个消息,并可以选择是否重新将消息放回队列。参数含义:- deliveryTag:消息标签- multiple:是否批量拒绝- requeue:是否重新入队列作用:拒绝消息,并可选择是否重新入队列。5. basicReject:用于拒绝处理某个消息,并可以选择是否重新将消息放回队列。参数含义:- deliveryTag:消息标签- requeue:是否重新入队列作用:拒绝消息,并重新入队列。6. queueDeclare:用于声明一个队列。参数含义:- queue:队列名称- durable:是否持久化- exclusive:是否独占- autoDelete:是否自动删除- arguments:队列参数作用:声明队列,如果队列不存在则创建。7 exchangeDeclare:用于声明一个交换机。参数含义:- exchange:交换机名称- type:交换机类型- durable:是否持久化- autoDelete:是否自动删除- internal:是否内部使用- arguments:交换机参数作用:声明交换机,如果交换机不存在则创建。8. queueBind:用于将队列绑定到指定的交换机和路由键。参数含义:- queue:队列名称- exchange:交换机名称- routingKey:路由键- arguments:绑定参数作用:将队列绑定到交换机上,等待被消费者消费。
Channel类的作用是提供了与RabbitMQ服务器进行通信的通道,通过该通道可以进行消息的发送和接收,以及队列和交换机的声明和绑定等操作。同时,Channel类还提供了一些方法用于控制消息的确认和拒绝,以及消息的持久化等高级特性。
DefaultConsumer
DefaultConsumer是RabbitMQ客户端库中的一个类,它实现了Consumer接口,用于处理从RabbitMQ服务器接收到的消息。
DefaultConsumer类的常用方法包括:
1. handleDelivery:处理从RabbitMQ服务器接收到的消息。void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;参数含义:- consumerTag:消费者标签,用于标识消费者。- envelope:消息的信封,包含了消息的元数据,如交换机、路由键等。- properties:消息的属性,包含了消息的元数据,如消息ID、消息类型等。- body:消息的内容,即消息体。2. handleShutdownSignal:处理与RabbitMQ服务器的连接关闭信号。void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);参数含义:- consumerTag:消费者标签,用于标识消费者。- sig:关闭信号异常,包含了关闭的原因和异常信息。3. handleConsumeOk:处理与RabbitMQ服务器的消费者注册成功信号。void handleConsumeOk(String consumerTag);参数含义:- consumerTag:消费者标签,用于标识消费者。4. handleCancelOk:处理与RabbitMQ服务器的消费者取消注册成功信号。void handleCancelOk(String consumerTag);参数含义:- consumerTag:消费者标签,用于标识消费者。
在handleDelivery方法中,我们可以根据需要对消息进行处理,例如解析消息内容、存储消息等。
生产者示例代码
创建RabbitMQ连接、创建通道、声明交换机和发送消息的完整示例代码:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitMQExample {private static final String QUEUE_NAME = "my_queue";private static final String EXCHANGE_NAME = "my_exchange";private static final String ROUTING_KEY = "my_routing_key";public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");try {// 创建连接Connection connection = factory.newConnection();// 创建通道Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 发送消息String message = "Hello, RabbitMQ!";channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());System.out.println("Sent message: " + message);// 关闭通道和连接channel.close();connection.close();} catch (IOException | TimeoutException e) {e.printStackTrace();}}
}
在这个示例中,使用了默认的本地RabbitMQ服务器,用户名和密码都是"guest",虚拟主机是"/"。创建了一个名为"my_exchange"的topic类型交换机,并发送了一条消息到该交换机上,使用了"my_routing_key"作为路由键。
消费者示例代码
创建RabbitMQ连接、创建通道、声明交换机、以及接收消息:
import com.rabbitmq.client.*;public class RabbitMQExample {private final static String QUEUE_NAME = "my_queue";private final static String EXCHANGE_NAME = "my_exchange";private final static String ROUTING_KEY = "my_routing_key";public static void main(String[] args) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");// 创建连接Connection connection = factory.newConnection();// 创建通道Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);// 创建消费者Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Received message: " + message);}};// 开始消费消息channel.basicConsume(QUEUE_NAME, true, consumer);}
}
以上示例代码演示了如何创建一个RabbitMQ连接、创建通道、声明一个类型为TOPIC的交换机、以及接收消息的过程。你可以根据自己的需求修改相关参数和逻辑。