文章目录
- 一.Asynq介绍
- 二.所需工具
- 三.代码示例
- 四.Reference
一.Asynq介绍
Asynq 是一个 Go 库,一个高效的分布式任务队列。
Asynq 工作原理:
- 客户端(生产者)将任务放入队列
- 服务器(消费者)从队列中拉出任务并为每个任务启动一个工作 goroutine
- 多个工作人员同时处理任务
git库:https://github.com/hibiken/asynq
二.所需工具
Asynq 使用 Redis 作为消息代理。client 和 server 都需要连接到 Redis 进行写入和读取。
PS:请确保所使用redis >= 5.0
三.代码示例
以记录操作的中间件函数向数据库写数据的情景为例。
- 生产者(客户端)函数调用入口:
其中 map 为需向数据库写入的内容
client.Call("audit:opera", map[string]any{"uri": uri,"method": method,"params": string(paramsByte),"headers": string(headerByte),"code": codeInt,"model": model,"action": action,"user_id": userId,"company_id": companyId,"user_name": userName,"company": companyName,
})
- 生产者函数
func Call(t string, payload map[string]any) error {// redis连接client := asynq.NewClient(asynq.RedisClientOpt{Addr: "127.0.0.1:6379",Password: "",DB: 1,})defer client.Close()switch t {case "audit:opera":// 初始化新任务task, err := server.NewOperateSendTask(payload)if err != nil {return err}// 任务入队_, err = client.Enqueue(task, asynq.Queue("audit"))if err != nil {log.Err(err).Msg(fmt.Sprintf("task: %v\n", task))return err}}return nil
}
func NewOperateSendTask(data map[string]any) (*asynq.Task, error) {payload, err := json.Marshal(data)if err != nil {return nil, err}return asynq.NewTask(consts.TypeAuditOpera, payload), nil
}
- 消费者函数
func HandlerAuditOperateTask(ctx context.Context, t *asynq.Task) error {var record ent.OperateRecord// 队列中取任务err := json.Unmarshal(t.Payload(), &record)if err != nil {log.Err(err).Msg("task.json.Unmarshal")return err}// 真正的数据库操作err = dao.OperateRecord.CreateOperateRecord(&record)if err != nil {log.Err(err).Msg("task.dao.OperateRecord.CreateOperateRecord")return err}return nil
}
- asynq初始化(消费者启动入口,项目初始化时自动启动)
func InitAsynq(ip string, port int, passwd string) {addr := fmt.Sprintf("%s:%d", ip, port)srv := asynq.NewServer(asynq.RedisClientOpt{Addr: "127.0.0.1:6379",Password: "",DB: 1,},// 异步队列asynq.Config{Queues: map[string]int{"audit": 3,},},)mux := asynq.NewServeMux()// 启动消费者mux.HandleFunc("audit:opera", server.HandlerAuditOperateTask)go srv.Run(mux)}
四.Reference
Go异步任务解决方案之Asynq库详解:
https://www.jb51.net/article/275392.htm