Rabbitmq 搭建使用案例
文章目录
- RabbitMQ搭建
- docker
- 代码
- golang
- 生产者
- 消费者
- 可视化
- 消费进度
RabbitMQ搭建
docker
docker run -d --hostname rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_MANAGEMENT_PLUGIN=rabbitmq_management_agent -p 15672:15672 -p 5672:5672 rabbitmq:management
代码
golang
生产者
package mainimport ("flag""fmt"amqp "github.com/rabbitmq/amqp091-go""log""strconv""time"
)func main() {var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")var exchange = flag.String("exchange", "logs", "Exchange name")var key = flag.String("key", "log", "Routing key")flag.Parse()// 连接到RabbitMQ服务器conn, err := amqp.Dial(*url)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare(*exchange, // name: 交换机名称"fanout", // kind: 交换机类型true, // durable: 是否持久化false, // autoDelete: 没有队列绑定时是否自动删除false, // internal: 是否是内部交换机false, // noWait: 是否需要等待服务器响应nil, // args: 其他参数)if err != nil {log.Fatalf("Failed to declare an exchange: %v", err)}// 发送消息body := "Hello World!" + fmt.Sprintf(time.Now().String())for i := 0; i < 20; i++ {body = strconv.Itoa(i) + bodyerr = ch.Publish(*exchange, // 交换机名称*key, // 路由键false, // 强制发布false, // 立即发布amqp.Publishing{ContentType: "text/plain",DeliveryMode: amqp.Persistent,Body: []byte(body),Expiration: "10000", // 3000 3秒})}if err != nil {log.Fatalf("Failed to publish a message: %v", err)}fmt.Printf(" [x] Sent %s", body)
}
消费者
package mainimport ("flag""fmt""log"amqp "github.com/rabbitmq/amqp091-go"
)func main() {var url = flag.String("url", "amqp://admin:admin@localhost:5672/my_vhost", "RabbitMQ server URL")var exchange = flag.String("exchange", "logs", "Exchange name")var key = flag.String("key", "log", "Routing key")flag.Parse()// 连接到RabbitMQ服务器conn, err := amqp.Dial(*url)if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare(*exchange, // name: 交换机名称"fanout", // kind: 交换机类型true, // durable: 是否持久化false, // autoDelete: 没有队列绑定时是否自动删除false, // internal: 是否是内部交换机false, // noWait: 是否需要等待服务器响应nil, // args: 其他参数)if err != nil {log.Fatalf("Failed to declare an exchange: %v", err)}// 声明一个队列q, err := ch.QueueDeclare("queue01", // 随机生成队列名称true, // 持久化false, // 删除false, // 独占false, // 不等消息nil, // 其他参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 绑定队列到交换机err = ch.QueueBind(q.Name, // 队列名称*key, // 路由键*exchange, // 交换机名称false, // 现在绑定nil, // 其他参数)if err != nil {log.Fatalf("Failed to bind a queue: %v", err)}// 接收消息msgs, err := ch.Consume(q.Name, // 队列名称"consumer01", // 消费者标签false, // 自动ackfalse, // 不独占false, // 不等消息false, // 不从服务器获取消息nil, // 其他参数)if err != nil {log.Fatalf("Failed to register a consumer: %v", err)}fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")for d := range msgs {// 输出接收到的消息fmt.Printf(" [x] Received %s\n", d.Body)err = ch.Ack(d.DeliveryTag, true)if err != nil {log.Fatalf("Failed to ack message: %v", err)}}
}
可视化
看板
http://localhost:15672/
账户密码
admin
admin
消费进度
http://localhost:15672/#/queues