最近在使用Golang做了一个网盘项目(类似百度网盘),这个网盘项目有一个功能描述如下:用户会删除一个文件到垃圾回收站,回收站的文件有一个时间期限,比如24h,24h后数据库中记录和oss中文件会被删除,在之前的版本中,可以使用定时任务来检查数据库记录中删除时间来判断是否删除,但是这不是最佳的,因此考虑如何基于RabbitMQ来实现这个功能。
使用RabbitMQ的架构
代码
因为前端有点麻烦,这里全部使用Golang后端来模拟实现整个架构,包括生产端和消费端。这里有一些细节
- 注意交换机和队列的绑定,一定要细心
- 交换机一旦声明了就不能更改,如果要发生一些属性的更改,就要删除原来的内容,重新生成
- 下列的内容不包含RabbitMQ持久化的内容
package mainimport ("fmt""github.com/streadway/amqp""log""strings"
)func InitRabbitMQ() *amqp.Connection {mq := "amqp"host := "127.0.0.1"port := "5672"user := "root"pwd := "root"dns := strings.Join([]string{mq, "://", user, ":", pwd,"@", host, ":", port, "/"}, "")conn, err := amqp.Dial(dns)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}return conn
}func InitMainExchangeAndQueue(ch *amqp.Channel, userID string) *amqp.Channel {// 队列信息exchangeName := "main_exchange"queueName := fmt.Sprintf("user_queue_%s", userID)messageTTL := int32(300000)// 声明主交换机err := ch.ExchangeDeclare(exchangeName, // 交换机名"direct", // Exchange typefalse, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err != nil {log.Fatalf("Failed to declare an main exchange: %v", err)}// 声明用户队列_, err = ch.QueueDeclare(queueName, // 队列名false, // Durablefalse, // Delete when unusedfalse, // Exclusivefalse, // No-waitamqp.Table{"x-dead-letter-routing-key": "dead", // routing-key"x-dead-letter-exchange": "dead_exchange", // 死信交换机"x-message-ttl": messageTTL, // TTL},)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind(queueName, userID, "main_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}return ch
}func InitDeadExchangeAndQueue(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange",amqp.ExchangeDirect,true,false,false,false,nil,)if err != nil {log.Fatalf("Failed to declare an dead exchange: %v", err)}// 声明一个死信队列_, err = ch.QueueDeclare("dead_queue",true,false,false,false,nil)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定err = ch.QueueBind("dead_queue", "dead", "dead_exchange", false, nil)if err != nil {log.Fatalf("Failed to bind queue to exchange: %v", err)}
}func PublishMessage(ch *amqp.Channel, userID, fileID string) {// 用户信息message := fmt.Sprintf("%s|%s", userID, fileID)exchangeName := "main_exchange"// 发布用户消息err := ch.Publish(exchangeName, // ExchangeuserID, // Routing keyfalse, // Mandatoryfalse, // Immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(message),})if err != nil {log.Fatalf("Failed to publish a message: %v", err)}log.Printf("Message sent to user %s: %s", userID, message)
}func ConsumeTTL(ch *amqp.Channel) {// 声明死信交换机err := ch.ExchangeDeclare("dead_exchange", // 交换机名"direct", // Exchange typetrue, // Durablefalse, // Auto-deletedfalse, // Internalfalse, // No-waitnil, // Arguments)if err != nil {log.Fatalf("Failed to declare a dead letter exchange: %v", err)}// 创建消费者并阻塞等待消费死信队列中的消息megs, err := ch.Consume("dead_queue", // Queue"", // Consumerfalse, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err != nil {log.Fatalf("Failed to register a consumer for dead letter queue: %v", err)}// 使用无限循环一直监听fmt.Println("Waiting for message from dead_queue......")for d := range megs {// 实际中,处理消息的逻辑,例如删除文件或其他操作fmt.Println(string(d.Body))// 消费完成后手动确认消息err = d.Ack(false)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}func Consume(ch *amqp.Channel, userID string) {// 下面的信息可以通过前后端进行传递queueName := fmt.Sprintf("user_queue_%s", userID)// 消费消息megs, err := ch.Consume(queueName, // Queue"", // Consumertrue, // Auto-acknowledgefalse, // Exclusivefalse, // No-localfalse, // No-waitnil, // Args)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}// 这里直接是由前端发送过来的API进行触发,所以不用一直阻塞监听d, ok := <-megsif !ok {log.Fatalf("Failed to get message: %v", err)}fmt.Println(string(d.Body))// 消息完成后确认消息err = d.Ack(true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}
}func main() {// 获取客户端client := InitRabbitMQ()defer client.Close()ch, err := client.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()//ConsumeTTL(ch)// 构造dead_exchange及dead_queue// InitDeadExchangeAndQueue(ch)// 假设这是web请求信息//var userID1 = "test-id1"//var fileID1 = "file1"// 构造main_exchange及user_queue//ch = InitMainExchangeAndQueue(ch, userID1)// 针对用户1:假设还消息没有过期时候就被recovery,即在user_queue中就被消费,实际中发布消息的这部分逻辑应当放在前端中//PublishMessage(ch, userID1, fileID1)//time.Sleep(20 * time.Second)// 模拟后端消费消息//Consume(ch, userID1)// 针对用户2:模拟其不被后端消费,过期到死信队列中var userID2 = "test-id2"var fileID2 = "file2"ch = InitMainExchangeAndQueue(ch, userID2)PublishMessage(ch, userID2, fileID2)// 注意这个消息没有被消费,理论上应当被死信队列消费
}
从dead_exchange中消费: