Asynq: 基于Redis实现的Go生态分布式任务队列和异步处理库

Asynq[1]是一个Go实现的分布式任务队列和异步处理库,基于redis,类似Ruby的sidekiq[2]和Python的celery[3]。Go生态类似的还有machinery[4]和goworker

alt

同时提供一个WebUI asynqmon[5],可以源码形式安装或使用Docker image, 还可以和Prometheus集成

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon,如果使用的是主机上的redis,还需加上 --redis-addr=host.docker.internal:6379,否则会报错[6]

docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379


➜  asynq-demo git:(main) ✗ tree
.
├── client.go
├── const.go
├── go.mod
├── go.sum
└── server.go

0 directories, 5 files

其中const.go:

package main

const (
 redisAddr   = "127.0.0.1:6379"
 redisPasswd = ""
)

const (
 TypeExampleTask    = "shuang:asynq-task:example"
)

client.go:


package main

import (
 "encoding/json"
 "fmt"
 "log"
 "time"

 "github.com/hibiken/asynq"
)

type ExampleTaskPayload struct {
 UserID string
 Msg    string
 // 业务需要的其他字段
}

func NewExampleTask(userID string, msg string) (*asynq.Task, error) {
 payload, err := json.Marshal(ExampleTaskPayload{UserID: userID, Msg: msg})
 if err != nil {
  return nil, err
 }
 return asynq.NewTask(TypeExampleTask, payload), nil
}

var client *asynq.Client

func main() {

 client = asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr, Password: redisPasswd, DB: 0})
 defer client.Close()

 //go startExampleTask()
 startExampleTask()

 //startGithubUpdate() // 定时触发
}

func startExampleTask() {

 fmt.Println("开始执行一次性的任务")
 // 立刻执行
 task1, err := NewExampleTask("10001""mashangzhixing!")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err := client.Enqueue(task1)
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task1 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 10秒后执行(定时执行)
 task2, err := NewExampleTask("10002""10s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 info, err = client.Enqueue(task2, asynq.ProcessIn(10*time.Second))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task2 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)

 // 30s后执行(定时执行)
 task3, err := NewExampleTask("10003""30s houzhixing")
 if err != nil {
  log.Fatalf("could not create task: %v", err)
 }

 theTime := time.Now().Add(30 * time.Second)
 info, err = client.Enqueue(task3, asynq.ProcessAt(theTime))
 if err != nil {
  log.Fatalf("could not enqueue task: %v", err)
 }
 log.Printf("task3 -> enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

server.go:

package main

import (
 "context"
 "encoding/json"
 "fmt"
 "time"

 "github.com/davecgh/go-spew/spew"
 "github.com/hibiken/asynq"
)

var AsynqServer *asynq.Server // 异步任务server

func initTaskServer() error {
 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,
    "default":  3,
    "low":      1,
   },
   // See the godoc for other configuration options
  },
 )
 return nil
}

func main() {
 initTaskServer()
 mux := asynq.NewServeMux()

 mux.HandleFunc(TypeExampleTask, HandleExampleTask)
 // ...register other handlers...

 if err := AsynqServer.Run(mux); err != nil {
  fmt.Printf("could not run asynq server: %v", err)
 }
}

func HandleExampleTask(ctx context.Context, t *asynq.Task) error {

 res := make(map[string]string)

 spew.Dump("t.Payload() is:", t.Payload())
 err := json.Unmarshal(t.Payload(), &res)
 if err != nil {
  fmt.Printf("rum session, can not parse payload: %s,  err: %v", t.Payload(), err)
  return nil
 }
 //-----------具体处理逻辑------------
 spew.Println("拿到的入参为:", res, "接下来将进行具体处理")
 fmt.Println()
 // 模拟具体的处理
 time.Sleep(5 * time.Second)
 fmt.Println("--------------处理了5s,处理完成-----------------")

 return nil

}

执行redis-server


清除redis中所有的key:


执行docker run --rm --name asynqmon -p 8080:8080 hibiken/asynqmon --redis-addr=host.docker.internal:6379

alt

执行 go run client.go const.go (生产者,产生消息放入队列)

alt

此时能看到redis中多个几个key

alt

同时管理后台能看到队列的信息

alt

执行 go run server.go const.go (消费者,消费队列中的消息)

alt

可以看到都被处理了

alt

此时redis中的key:

alt

此处的业务处理为模拟,实际可能是某个被触发后不需要马上执行的操作




实际试一下。通过一个定时器(24h执行一次),触发代码每天向github push当天的代码等内容。收到触发后无需马上执行(可能当时其他请求量高,机器资源紧张),可以先放入队列,延迟30min后实际去执行。

完整Demo[7] push github的功能没有完全实现


另外可以配置队列的优先级,asynq队列如何配置队列优先级[8]

 // 初始化异步任务服务端
 AsynqServer = asynq.NewServer(
  asynq.RedisClientOpt{
   Addr:     redisAddr,
   Password: redisPasswd, //与client对应
   DB:       0,
  },
  asynq.Config{
   // Specify how many concurrent workers to use
   Concurrency: 100,
   // Optionally specify multiple queues with different priority.
   Queues: map[string]int{
    "critical"6,//关键队列中的任务将被处理 60% 的时间
    "default":  3,//默认队列中的任务将被处理 30% 的时间
    "low":      1,//低队列中的任务将被处理 10% 的时间
   },
   // See the godoc for other configuration options
  },
 )

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误[9]

参考资料

[1]

Asynq: https://github.com/hibiken/asynq

[2]

sidekiq: https://github.com/sidekiq/sidekiq

[3]

celery: https://github.com/celery/celery

[4]

machinery: https://blog.csdn.net/weixin_42681866/article/details/123334654

[5]

asynqmon: https://github.com/hibiken/asynqmon

[6]

报错: https://github.com/hibiken/asynqmon/issues/214

[7]

完整Demo: https://github.com/cuishuang/asynq-demo

[8]

asynq队列如何配置队列优先级: https://blog.csdn.net/itopit/article/details/126123626

[9]

go asynq 异步任务 (延迟触发) 简单案例及奇怪的错误: https://my.oschina.net/randolphcyg/blog/5539676

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/27866.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

配置页面的路由

1.下载router npm i router 2.注册路由 文件路径 :src/router/index.js import Vue from "vue"; import VueRouter from "vue-router"; Vue.use(VueRouter); import Home from "../components/home.vue"; import Main from …

聚观早报|iPhone 15预计9月22日上市;一加Open渲染图曝光

【聚观365】8月7日消息 iPhone 15预计9月22日上市一加Open渲染图曝光Redmi K60至尊版细节曝光小米14 Pro屏幕细节曝光vivo V3正式发布,执着自研“影像芯片” iPhone 15预计9月22日上市 上周有多位消息人士透露,多家合作的电信运营商已要求员工不要在9月…

【测试】软件测试工具JMeter简单用法

简明扼要,点到为止。 1. JMeter介绍 JMeter的全称是Apache JMeter,是一款用于软件测试的工具软件,其是开源免费的,由Apache基金会运营。 官网:Apache JMeter - Apache JMeter™ 2. 下载安装及运行 2.1 安装 Java8…

ParallelCollectionRDD [0] isEmpty at KyuubiSparkUtil.scala:48问题解决

ParallelCollectionRDD [0] isEmpty at KyuubiSparkUtil.scala:48问题解决 这个问题出现在使用Kyubi Spark Util处理ParallelCollectionRDD的过程中,具体是在KyubiSparkUtil.scala文件的第48行调用isEmpty方法时出现的。该问题可能是由以下几个原因引起的&#xff1…

【网关】Shenyu网关自动注册和同步元数据和URL,Shenyu-admin从nacos同步数据方案

Shenyu官网数据同步设计方案如下面图,同步方式支持 Zookeeper、Http 长轮询、Websocket、Nacos、Etcd 和 Consul等。我们选择的时候,要小心配置参数,这里我以官网http和自实现的nacos为例。 官网示例代码 http方式注册 yml配置admin的账号信息…

React Dva 操作models中的subscriptions讲述监听

接下来 我们来看一个models的属性 之前没有讲到的subscriptions 我们可以在自己有引入的任意一个models文件中这样写 subscriptions: {setup({ dispatch, history }) {console.log(dispatch);}, },这样 一进来 这个位置就会触发 这里 我们可以写多个 subscriptions: {setup…

Java课题笔记~ 关联映射

一、MyBatis关联查询 在关系型数据库中,表与表之间存在着3种关联映射关系,分别为一对一、一对多、多对多。 一对一:一个数据表中的一条记录最多可以与另一个数据表中的一条记录相关。列如学生与学号就属于一对一关系。 一对多:主…

升级你的GitHub终端认证方式:从密码到令牌

升级你的GitHub终端认证方式:从密码到令牌 前言 GitHub官方在2021年8月14日进行了一次重大改变,它将终端推送代码时所需的身份认证方式从密码验证升级为使用个人访问令牌(Personal Access Token)。这个改变引起了一些新的挑战&am…

Linux 安装软件的几种方式

哈喽大家好,我是咸鱼 相信小伙伴们都知道在 Linux 中,安装软件一般有三种方式 yum 安装rpm 安装源码编译安装 咸鱼平时三种安装方式都会用,但是具体原理和区别却没有去深入了解过 结果上周部门刚来的新人问我这几种安装方式的时候&#x…

每天一道leetcode:剑指 Offer 32 - III. 从上到下打印二叉树 III(中等广度优先遍历)

今日份题目: 请实现一个函数按照之字形顺序打印二叉树,即第一行按照从左到右的顺序打印,第二层按照从右到左的顺序打印,第三行再按照从左到右的顺序打印,其他行以此类推。 示例 给定二叉树: [3,9,20,null,null,15,7…

lwip不同的socket分别作为监听和客户端连接

在LWIP中,一个网络设备(如以太网卡)可以创建多个socket,用于处理不同的网络连接。一般,你可以创建一个socket用于监听(listen)连接,另一个socket用于主动发起(connect&am…

Win7 专业版Windows time w32time服务电脑重启后老是已停止

环境: Win7 专业版 问题描述: Win7 专业版Windows time w32time服务电脑重启后老是已停止 解决方案: 1.检查启动Remote Procedure Call (RPC)、Remote Procedure Call (RPC) Locator,DCOM Server Process Launcher这三个服务是…

RocketMQ Learning(一)

目录 一、RocketMQ 0、RocketMQ的产品发展 1、RocketMQ安装 1.1、windows下的安装 注意事项 1.2、Linux下的安装 1.3、源码的安装 1.4、控制台 2、消息发送方式 2.1、发送同步消息 2.2、发送异步消息 2.3、单向发送 3、消息消费方式 3.1、负载均衡模式&#xff0…

SSM(Vue3+ElementPlus+Axios+SSM前后端分离)--具体功能实现【三】

文章目录 SSM--功能实现实现功能04-添加家居信息需求分析/图解思路分析代码实现注意事项和细节 实现功能05-显示家居信息需求分析/图解思路分析 代码实现 SSM–功能实现 实现功能04-添加家居信息 需求分析/图解 思路分析 完成后台代码从dao -> serivce -> controller ,…

自动化应用杂志自动化应用杂志社自动化应用编辑部2023年第11期目录

数据处理与人工智能 大数据视域下无轨设备全生命周期健康管理技术的研究 赖凡; 1-3 三维激光扫描结合无人机倾斜摄影在街区改造测绘中的技术应用 张睿; 4-6 井上变电站巡检机器人的设计与应用 刘芳; 7-9 《自动化应用》投稿邮箱:cnqikantg126.com 基于机…

机器学习笔记

文章目录 编码器-解码器Batch Normalization好处 编码器-解码器 第二个input与transformer中的解码器类似。 Batch Normalization 尽量使得w1和w2之间呈现为正圆 训练模型的时候, μ \mu μ和 σ \sigma σ不可以认为是常数,而是包含数据的变量&…

部署SpringBoot项目在服务器上,并通过swagger登录

1.项目编译打包 2.上传jar包到服务器并启动 xftp将打包好后的jar包传到虚拟机指定路径 java -jar执行该jar包 3.通过swagger登录 输入后点击下面的执行按钮 会得到一个tocken 4.将tocken放到postman的Headers中 5.修改url 例如我本地测试是http://localhost:8080/接口名&am…

Vue命名规范

JS文件命名 一般采用的是小驼峰命名法,如 pieChartHelp 第一个单词小写,其他单词首字母大写 Components 文件命名 一般采用的是大驼峰命名法,如PieChart 所有单词的首字母大写 常量命名 一般全部大写,每个单词使用分隔符隔开&…

一文读懂|RDMA原理

什么是DMA DMA全称为Direct Memory Access,即直接内存访问。意思是外设对内存的读写过程可以不用CPU参与而直接进行。我们先来看一下没有DMA的时候: 无DMA控制器时I/O设备和内存间的数据路径 假设I/O设备为一个普通网卡,为了从内存拿到需要…

【MySQL】MySQL数据类型

文章目录 一、数据类型的分类二、tinyint类型2.1 创建有符号数值2.2 创建无符号数值 三、bit类型三、浮点类型3.1 float3.2 decimal类型 四、字符串类型4.1 char类型4.2 varchar类型 五、日期和时间类型六、枚举和集合类型6.1 enum的枚举值和set的位图结构6.2 查询集合find_in_…