go-zero(十二)消息队列

go zero 消息队列

在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。

go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。

一、kafka简单介绍

Kafka 是一个开源的分布式流处理平台,主要用于构建实时数据管道和流应用。

1.Kafka 的架构

Kafka 的架构通常由以下几个部分构成:

  • Broker(节点):Kafka 集群由多个 broker 实例组成,负责管理消息的存储和处理。
  • Topic(主题):消息以主题的形式组织,每个主题可以有多个分区(partition),以支持高并发和扩展性。
  • Produce(生产者):消息生产者将数据发送到特定的topic中。
  • Consumer(消费者):消费者从topic中读取数据,可以将多个消费者分组以进行负载均衡。

2.Kafka 的关键特性

  1. 高吞吐量
    Kafka 设计上能够处理大量的实时数据流,具备非常高的吞吐量。这使得它能够轻松应对大规模数据流量,适合做日志聚合、监控数据处理等。

  2. 持久性
    Kafka 将消息持久化到磁盘,并提供复制功能,以确保数据的安全性和可靠性。即使在节点出现故障的情况下,也能保证数据不会丢失。

  3. 可扩展性
    Kafka 能够水平扩展,通过增加更多的节点来处理更多的消费者和生产者,这使得它能够应对越来越多的业务需求。

  4. 实时处理
    Kafka 提供低延迟的数据传输,这使得实时处理和分析成为可能。您可以瞬时处理到来的数据流。

  5. 支持多种消息传递模式
    Kafka 支持发布-订阅和点对点的消息传递模式,能够灵活适应不同场景下的需求。

  6. 强大的生态系统
    Kafka 拥有丰富的生态系统,包括 Kafka Streams 和 Kafka Connect,这些工具可以帮助开发者更方便地进行流处理和数据集成。

3.常见应用场景

  1. 日志聚合
    Kafka 可以作为一个集中式的日志聚合器,将分布在不同服务的日志集中到一个地方,方便后续分析和监控。

  2. 实时数据流处理
    使用 Kafka,用户可以实时处理和分析流数据,例如检测异常、生成实时报告等。

  3. 系统监控和事件追踪
    Kafka 经常用于收集和跟踪系统事件(如用户行为、系统状态等),并通过流式处理进行实时监控。

  4. 数据集成
    Kafka 可以作为数据的桥梁,连接不同的数据源和目标系统,方便实现数据的流转和转换。

  5. 消息队列
    Kafka 可用作高效的消息队列,实现服务间的异步通信。例如,在微服务架构中,服务 A 可以将消息发送到 Kafka,而服务 B 可以异步地从 Kafka 中读取处理这些消息。

二、环境部署

1.Docker安装Kafka

配置文件:

version: '3'######## 项目依赖的环境,启动项目之前要先启动此环境 #######services:#zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafkazookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperenvironment:# 时区上海 - Time zone Shanghai (Change if needed)TZ: Asia/Shanghairestart: alwaysports:- 2181:2181networks:- gozero_net#消息队列 - Message queuekafka:image: 'bitnami/kafka:3.6.2'container_name: kafkarestart: alwaysulimits:nofile:soft: 65536hard: 65536environment:- TZ=Asia/Shanghai- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLERports:- '9092:9092'- '9094:9094'volumes:- ./volumes/kafka:/bitnami/kafkanetworks:gozero_net:driver: bridgeipam:config:- subnet: 172.16.0.0/16

这里的kafka 对外暴露的端口为9094

使用命令执行文件

docker compose up

2.创建topic

进入kafka容器

docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash

进入kafka执行命令目录
进入kafka执行命令目录

cd /opt/bitnami/kafka/bin

创建topic

创建名为topic-test的topic

./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092

查看topic信息

./kafka-topics.sh --describe --topic topic-test --bootstrap-server localhost:9092

3.测试topic

使用两个终端,进入kafka执行命令目录,执行下面两个命令:

生产消息

./kafka-console-producer.sh --topic topic-test --bootstrap-server localhost:9092

消费消息

./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092

测试

在生产者输入消息,会自动同步到消费者
在这里插入图片描述

4. 拉取依赖

项目中首先要拉取 go-queue 的依赖

go get github.com/zeromicro/go-queue@latest

三、项目演示

1.配置说明

type KqConf struct {service.ServiceConf// Brokers: Kafka 的多个 Broker 节点Brokers []string// Group: 消费者组Group string// Topic: 订阅的 Topic 主题Topic string// Offset: 如果新的 topic Kafka 没有对应的 offset 信息,或者当前的 offset 无效(历史数据被删除),// 需要指定从头(first)消费还是从尾(last)消费Offset string `json:",options=first|last,default=last"`// Conns: 一个 Kafka queue 对应可对应多个 consumer,Conns 对应 Kafka queue 数量,// 可以同时初始化多个 Kafka queue,默认只启动一个Conns int `json:",default=1"`// Consumers: go-queue 内部起多个 goroutine 从 Kafka 中获取信息写入进程内的 channel,// 此参数控制 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)Consumers int `json:",default=8"`// Processors: 当 Consumers 中的多个 goroutine 拉取到 Kafka 消息后,// 通过此参数控制当前消费逻辑的并发 goroutine 数量Processors int `json:",default=8"`// MinBytes: fetch 一次返回的最小字节数,若不够该字节数则会等待MinBytes int `json:",default=10240"`    // 10K// MaxBytes: fetch 一次返回的最大字节数,若第一条消息大小超过该限制仍会继续拉取,// 以确保 consumer 的正常运行。并非绝对配置,消息大小也受 broker 的 message.max.bytes 限制,// 以及 topic 的 max.message.bytes 限制MaxBytes int `json:",default=10485760"` // 10M// Username: Kafka 的账号(可选)Username string `json:",optional"`// Password: Kafka 的密码(可选)Password string `json:",optional"`
}

2.配置

配置文件

在yaml 配置文件中添加当前的 kafka 配置信息,我这里为了省事就都放在一个配置文件下了:

#....#生产者
KqPusherConf:Name: log-producerBrokers:- 127.0.0.1:9094Group: logs-groupTopic: topic-test
#消费者
KqConsumerConf:Name: log-consumerBrokers:- 127.0.0.1:9094Group: logs-groupTopic: topic-testOffset: lastConsumers: 8Processors: 8

config.go

在 internal/config 下的 config.go 中定义 go 映射的配置


type Config struct {/*.....*/KqPusherConf   kq.KqConfKqConsumerConf kq.KqConf
}

svc注入
在 svc/serviceContext.go 中初始化 pusher 的 kq client

type ServiceContext struct {Config  config.ConfigKqPusherClient *kq.Pusher}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config:  c,KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),}
}

3. 生产者

在 logic 中写业务逻辑使用 go-queue 的 kq client 发送消息到 kafka,这里我们用登录作为演示,当登录成功后,发送用户信息:


func (l *LoginLogic) Login(req *types.LoginRequest) (resp *types.LoginResponse, err error) {// todo: add your logic here and delete this line/*....省略其他代码*///生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装threading.GoSafe(func() {logData := map[string]any{"user":   user.Username,"mobile": user.Mobile,}logs, _ := json.Marshal(logData)// 使用Push推送消息,消息为jsonerr := l.svcCtx.KqPusherClient.Push(l.ctx, string(logs))if err != nil {logx.Errorf("KqPusherClient Push Error , err :%v", err)}})// 如果既没有验证码也没有密码return nil, errors.New(10010, "未提供有效的登录凭证")	
}

生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装,如果出现panics 会自动恢复。

4. 消费者

internal 下新建一个 mq 文件夹,在 mq 文件夹下新建一个消费者 consumer.go:

package mqsimport ("beyond/user/api/internal/svc""context""fmt""github.com/zeromicro/go-zero/core/logc""github.com/zeromicro/go-zero/core/logx"
)//定义日志消费者
type LogsConsumer struct {ctx    context.ContextsvcCtx *svc.ServiceContext
}// 定义构造方法
func NewLogsConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LogsConsumer {return &LogsConsumer{ctx:    ctx,svcCtx: svcCtx,}
}// Consume为go zero内置接口, 实现Consume接口方法
func (l *LogsConsumer) Consume(ctx context.Context, key, val string) error {//logx.Infof("Consumer key :%s , val :%s", key, val)logc.Infof(ctx, "Consumer key :%s, val :%s", key, val)return nil
}

Consume 为go queue内置接口
在这里插入图片描述

因为消费者可能有多个,在 mq 文件夹下新建一个文件mqs.go用来监听多个消费者,mqs.go 代码如下:

package mqsimport ("beyond/user/api/internal/config""beyond/user/api/internal/svc""context""github.com/zeromicro/go-queue/kq""github.com/zeromicro/go-zero/core/service"
)func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {// 监听消费者状态变化return []service.Service{//创建消息队列kq.MustNewQueue(c.KqConsumerConf, NewLogsConsumer(ctx, svcCtx)),}}

在 main.go 中启动 consumers 等待消费


func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()svcCtx := svc.NewServiceContext(c)handler.RegisterHandlers(server, svcCtx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)// 因为现在添加了mq,属于多服务状态,所以需要启动mq服务//server.Start()  //创建新的服务组serviceGroup := service.NewServiceGroup()defer serviceGroup.Stop()// 从mq中获取消费者服务,并添加到服务组中for _, mq := range mqs.Consumers(c, context.Background(), svcCtx) {serviceGroup.Add(mq)}//添加原来的server服务serviceGroup.Add(server)// 启动服务组serviceGroup.Start()}

5.启动项目

我们这里就是简单的输出日志,你也可以拓展成发送邮件或者短信给用户,提示用户注册成功。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/63630.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JAVA高级篇教学】第二篇:使用 Redisson 实现高效限流机制

在高并发系统中,限流是一项非常重要的技术手段,用于保护后端服务,防止因流量过大导致系统崩溃。本文将详细介绍如何使用 Redisson 提供的 RRateLimiter 实现分布式限流,以及其原理、使用场景和完整代码示例。 目录 一、什么是限流…

基于卷积神经网络的Caser算法

将一段交互序列嵌入到一个以时间为纵轴的平面空间中形成“一张图”后,基于卷积序列嵌入的推荐(Caser)算法利用多个不同大小的卷积滤波器,来捕捉序列中物品间的点级(point-level)、联合的(union-…

【银河麒麟高级服务器操作系统】修改容器中journal服务日志存储位置无效—分析及解决方案

了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:https://product.kylinos.cn 开发者专区:https://developer.kylinos.cn 文档中心:https://documentkylinos.cn 服务器环境以及配置 【机型】 整机类型/架构&am…

HTML:表格重点

用表格就用table caption为该表上部信息,用来说明表的作用 thead为表头主要信息,效果加粗 tbody为表格中的主体内容 tr是 table row 表格的行 td是table data th是table heading表格标题 ,一般表格第一行的数据都是table heading

《AI教学能力:开启教育新纪元》

一、摘要 AI 在教育领域的应用日益广泛,对教学能力产生了深远影响。本文将深入探讨 AI 教学能力的核心技术、实际应用、教学模式与策略、全球实践以及未来趋势,为教育的现代化发展提供参考。 摘要: AI 在教育领域的应用及其对教学能力的影响…

域渗透入门靶机之HTB-Cicada

easy难度的windows靶机 信息收集 端口探测 nmap -sT --min-rate 10000 -p- 10.10.11.35 -oA ./port 发现开放了53,88,389等端口,推测为域控 进一步信息收集,对爆破的端口进行更加详细的扫描 小tips:对于众多的端口&…

zerotier实现内网穿透(访问内网服务器)

moo 内网穿透工具 实用工具:zerotier 目录 内网穿透工具 Windows下zerotier安装 ubuntu系统下的zerotier安装 使用moon加速 Windows下zerotier安装 有了网络之后,会给你一个网络id,这个网络id是非常重要的,其它设备要加入…

v-for遍历多个el-popover;el-popover通过visible控制显隐;点击其他隐藏el-popover

场景&#xff1a;el-popover通过visible控制显隐&#xff1b;同时el-popover是遍历生成的多个。 原文档的使用visible后就不能点击其他地方使其隐藏 主要监听全局点击事件即可 <template><div><template v-for"(item,index) in arr" :key"index&…

Robust Depth Enhancement via Polarization Prompt Fusion Tuning

paper&#xff1a;论文地址 code&#xff1a;github项目地址 今天给大家分享一篇2024CVPR上的文章&#xff0c;文章是用偏振做提示学习&#xff0c;做深度估计的。模型架构图如下 这篇博客不是讲这篇论文的内容&#xff0c;感兴趣的自己去看paper&#xff0c;主要是分享环境&…

TCP 2

文章目录 Tcp状态三次握手四次挥手理解TIME WAIT状态 如上就是TCP连接管理部分 流量控制滑动窗口快重传 延迟应答原理 捎带应答总结TCP拥塞控制拥塞控制的策略 -- 每台识别主机拥塞的机器都要做 面向字节流 Tcp状态 建立连接时 断开连接时 三次握手 tcp三次握手时我们想看看…

帝可得项目redis连接不上

首先我一切配置都没问题&#xff1a; 1. redis-server启动 2. 可视化界面显示redis已连接 原因&#xff1a; 不知道是不是因为不同版本的问题(因为我之前的sky就没这个问题) 这里把password改成auth就可以了

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验二----网络分析(超超超详细!!!)

相信实验一大家已经完成了&#xff0c;对Arcgis已进一步熟悉了&#xff0c;现在开启第二个实验 ArcMap实验--网络分析 目录 ArcMap实验--网络分析 1.1 网络分析介绍 1.2 实验内容及目的 1.2.1 实验内容 1.2.2 实验目的 2.2 实验方案 2.3 实验流程 2.3.1 实验准备 2.3.2 空间校正…

iPhone 17 Air基本确认,3个大动作

近段时间&#xff0c;果粉圈都在讨论一个尚未发布的新品&#xff1a;iPhone 17 Air&#xff0c;苹果又要来整新活了。 从供应链消息来看&#xff0c;iPhone 17 Air本质上是Plus的替代品&#xff0c;主要是在维持“大屏”这一卖点的同时&#xff0c;增加了“轻薄”属性&#xff…

H5 Admin后台管理系统、用户权限管理设计、按钮级别、数据级别、html+bootstrap后台管理前端界面设计

一、前言 一个高颜值后台管理模板&#xff0c;Light Year Admin后台管理系统模板是一个基于Bootstrap v3.3.7的纯HTML模板&#xff0c;目前也已经更新了基于Bootstrap 4.4.1的版本。都有iframe以及非iframe的两种不同的形式供大家选择使用。简洁而清新的后台模板&#xff0c;功…

Windows环境基于ecplise的spring boot框架新建spring start project

SpringToolSuite4 新建项目实例 前言Windows基于ecplise 工具的spring boot 架构 前言 使用Spring boot 框架向前端传输数据 Windows基于ecplise 工具的spring boot 架构 spring-tool-suite-4官网下载链接spring tool&#xff0c;下载太慢的话可以使用迅雷加速&#xff0c;右…

理解 CAP 理论:分布式系统中的权衡与选择 | 常用组件中的CP和AP

CAP定理是分布式系统设计中的一个基本定理&#xff0c;它指出在一个分布式计算系统中&#xff0c;一致性&#xff08;Consistency&#xff09;、可用性&#xff08;Availability&#xff09;、分区容忍性&#xff08;Partition Tolerance&#xff09;三者不可同时实现&#xff…

AttributeError: module ‘cv2.dnn‘ has no attribute ‘DictValue‘如何解决?

AttributeError: module cv2.dnn has no attribute DictValue如何解决&#xff1f; 出现场景出错原因解决方案 出现场景 当在代码中导入opencv的时候&#xff1a;import cv2&#xff0c;出现&#xff1a; 出错原因 查看大家出现的错误&#xff0c;发现是因为opencv版本问题…

Java Class类文件结构

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

ThinkPHP开发的原生微信小程序二手物品回收小程序管理系统源码

二手物品回收小程序 一款基于ThinkPHP开发的原生微信小程序二手物品回收小程序管理系统。支持线上下单、免费上门取件、评估价格等功能。提供全部无加密源码&#xff0c;支持私有化部署。

分布式专题(1)之Redis持久化、主从与哨兵架构详解

一、Redis持久化 1.1 RDB快照&#xff08;snapshot&#xff09; 在默认的情况下&#xff0c;Redis将内存数据快照保存名字为&#xff1a;dump.rdb的二进制文件中&#xff0c;当然你在配置文件redis.conf中修改对应的二进制文件名。 redis开启RDB快照&#xff0c;可以在redis中…