java 高并发mqtt服务器_Boomer 实战压测 mqtt,2w 并发轻松实现

// main.go

// 代码仅供参考,无法直接运行.

package main

import (

"bytes"

"encoding/csv"

"fmt"

MQTT "github.com/eclipse/paho.mqtt.golang"

"github.com/myzhan/boomer"

"io"

"io/ioutil"

"log"

"os"

"strconv"

"strings"

"sync"

"time"

)

var rows [][]string // 读取csv文件保存到这里

var clientTopic []map[string]MQTT.Client

var conn = 0 // 调试用

var failCount = 0 // 初始化失败数量

var i = 0 // 控制并发

var j = 1 // 记录消息发送成功

var f = 1 // 记录消息发送失败

var nowStr = strconv.Itoa(int(time.Now().Unix())) // 当前时间戳,用来做后续查询的消息的标识符

func newConn(c MQTT.Client, clientId string, group *sync.WaitGroup) {

defer func() {

group.Add(-1)

err := recover()

if err != nil {

failCount++

fmt.Println("login fail clientId: ", clientId)

}

}()

token := c.Connect()

if token.Wait() && token.Error() != nil {

panic(token.Error())

}

// 组装topic

topic := fmt.Sprintf("msg/%s/supply", clientId)

temp := make(map[string]MQTT.Client)

temp[topic] = c

clientTopic = append(clientTopic, temp)

conn++ // 调试用

}

func initClients() {

var wg sync.WaitGroup

server := "server_ip:1883"

for i := 0; i < len(rows); i++ {

wg.Add(1)

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

opts.SetKeepAlive(300 * time.Second)

c := MQTT.NewClient(opts)

go newConn(c, clientId, &wg)

}

wg.Wait() // 等到所有协程执行完成

fmt.Printf("init finish, clients len is %d \n", len(clientTopic))

fmt.Printf("conn: %d \n", conn)

fmt.Printf("failCount: %d \n", failCount)

}

func initCsvData() {

pwd, _ := os.Getwd()

b, err := ioutil.ReadFile(pwd + "/clients.csv")

fs := bytes.NewBuffer(b)

if err != nil {

log.Fatalf("can not open the file, err is %+v", err)

}

r := csv.NewReader(fs)

//针对大文件,一行一行的读取文件

for {

row, err := r.Read()

if err != nil && err != io.EOF {

log.Fatalf("can not read, err is %+v", err)

}

if err == io.EOF {

break

}

rows = append(rows, row)

}

}

func login() {

server := "server_ip:port"

clientId, userName, passWord := rows[i][0], rows[i][1], rows[i][2]

start := time.Now()

opts := MQTT.NewClientOptions().AddBroker(server)

opts.SetUsername(userName)

opts.SetPassword(passWord)

opts.SetClientID(clientId)

c := MQTT.NewClient(opts)

token := c.Connect()

elapsed := time.Since(start)

if token.Error() == nil {

log.Println("success" + strconv.Itoa(j))

boomer.RecordSuccess("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), int64(10))

} else {

log.Println(token.Error())

boomer.RecordFailure("tcp", "login", elapsed.Nanoseconds()/int64(time.Millisecond), clientId)

}

c.Disconnect(5)

// avoid out of array

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

j++

}

func sendMsg() {

start := time.Now()

msgId := "msg" + strconv.Itoa(i)

var clientId string

var topic string

var c MQTT.Client

for k, v := range clientTopic[i] {

clientId = k[6:19]

topic = k

c = v // v就是一个connected的client

}

deviceTime := nowStr

str := []string{msgId, clientId, deviceTime}

msgPayload := strings.Join(str, "|")

if c.IsConnected() == true {

token := c.Publish(topic, 1, false, msgPayload)

token.Wait() 等待消息发送完成,虽然会拉低并发,但必须要这么做,确保消息发送成功

elapsed := time.Since(start)

if token.Error() == nil {

fmt.Printf("this topic name is: %s \n", topic)

fmt.Printf("this topic payload is: %s \n", msgPayload)

fmt.Printf("success msg index: %v elapsed: %v \n", j, elapsed)

j++ // 消息发送成功, 记录一条,并且也给locust记录一条,方便后续校对数据量

boomer.RecordSuccess("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), int64(j))

// 避免数组越界

if i < len(clientTopic)-1 {

i++

} else {

i = 0

}

} else {

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

fmt.Printf("发送失败, fail msg index: %v \n", f)

}

} else {

if token := c.Connect(); token.Wait() && token.Error() != nil {

elapsed := time.Since(start)

fmt.Printf("fail msg index: %v \n", f)

f++

boomer.RecordFailure("tcp", "task", elapsed.Nanoseconds()/int64(time.Millisecond), msgPayload)

}

}

}

func main() {

initCsvData()

initClients()

task1 := &boomer.Task{

Name: "myTask",

Weight: 1,

Fn: sendMsg,

}

//task2 := &boomer.Task{

// Name: "login",

// Weight: 1,

// Fn: login,

//}

boomer.Run(task1)

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513092.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

从操作系统层面分析Java IO演进之路

简介&#xff1a; 本文从操作系统实际调用角度&#xff08;以CentOS Linux release 7.5操作系统为示例&#xff09;&#xff0c;力求追根溯源看IO的每一步操作到底发生了什么。 作者 | 道坚 来源 | 阿里技术公众号 前言 本文从操作系统实际调用角度&#xff08;以CentOS Linu…

AI是计算机科学,人工智能计算机科学(79种)...

COMPUTER SCIENCE, ARTIFICIAL INTELLIGENCE(人工智能计算机科学) 79种1. ADVANCED ENGINEERING INFORMATICSQuarterlyISSN: 1474-0346ELSEVIER SCI LTD, THE BOULEVARD, LANGFORD LANE, KIDLINGTON, OXFORD, ENGLAND, OXON, OX5 1GB2. AI COMMUNICATIONSQuarterlyISSN: 0921-7…

教程系列——用模板快速上线一个HR 服务中心

简介&#xff1a; 【开箱即用的模板使用系列教程】将会手把手教给大家如何快速启用钉钉宜搭提供各类模板。今天第一讲&#xff0c;介绍《HR 服务中心》的模板启用。 【开箱即用的模板使用系列教程】将会手把手教给大家如何快速启用钉钉宜搭提供各类模板。今天第1讲&#xff0c;…

数字化“团险”黑科技,保险极客技术升级背后心经

作者 | 宋慧 出品 | CSDN 云计算 疫情之后&#xff0c;一切都在“内卷”&#xff0c;HR 也逃不过。初创公司想要招到优秀人才&#xff0c;除了对市场和未来发展的预期和潜力&#xff0c;提供补充医疗险也是对人才重要的保障。另外&#xff0c;现在补充医疗也是知名大企业高福利…

powershell快捷键_借助Windows Terminal搞一个花里胡哨的PowerShell终端

一提起PowerShell&#xff0c;命令提示符等等&#xff0c;想到的就是丑、难用&#xff0c;非常丑&#xff01;各位可以先感受一下。不过&#xff0c;现在我们可以对它做一个美化&#xff0c;美化后的效果如下&#xff0c;各位也可以感受下(本人不提供背景图)下面做简单记录1、必…

linux刷新本地dns命令_两种方法修改Linux下的DNS后立即生效 - 文中之舞

DNS服务器介绍DNS是计算机域名系统(Domain Name System 或Domain Name Service) 的缩写&#xff0c;它是由域名解析器和域名服务器组成的。域名服务器是指保存有该网络中所有主机的域名和对应IP地址&#xff0c;并具有将域名转换为IP地址功能的服务器。其中域名必须对应一个IP地…

【详谈 Delta Lake 】系列技术专题 之 特性(Features)

简介&#xff1a; 本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 的系列技术文章。众所周知&#xff0c;Databricks 主导着开源大数据社区 Apache Spark、Delta Lake 以及 ML Flow 等众多热门技术&#xff0c;而 Delta Lake 作为数据湖核心存储引擎方案给企业带来…

*计算机应用基础* 说课稿,中职计算机应用基础《EXCEL中函数的使用》说课稿.doc...

中职计算机应用基础《EXCEL中函数的使用》说课稿说课稿《EXCEL中函数的使用》选用教材&#xff1a;《计算机应用基础》高等教育出版社出版各位专家、评委好&#xff01;我说课的题目是《EXCEL中函数的使用》&#xff0c;下面我将从教材、教法、学法以及教学程序设计等方面加以说…

python cmp函数未定义_python用plt画图时,cmp设置方法

在python&#xff0c;有时候是需要画图的&#xff0c;比如把一个矩阵用图像的形式显示&#xff0c;之前用的好好的&#xff0c;每次用plt.imshow()&#xff0c;都是彩色图&#xff0c;不知为啥&#xff0c;突然全是黑白图了&#xff0c;于是需要设置cmap的值&#xff0c;如下&a…

sid图像数据_MrSID技术在GIS中的应用

摘要&#xff1a;随着卫星遥感和航空摄影技术的发展&#xff0c;通过遥感获得的地理信息越来越多&#xff0c;特别是小卫星高分辨率遥感图象的商业化(如EOSAT)&#xff0c;遥感影像成为地理信息系统(GIS)一个非常重要的信息源&#xff0c;这对海量数据的及时存储与传输提出了很…

聚焦四大领域,恒生电子发布2022年金融科技技术与应用趋势

12月1日&#xff0c;恒生电子在2021 LIGHT开发者云大会上正式发布《2022金融科技趋势研究报告》&#xff08;以下简称“报告”&#xff09;。报告详细分析2022年金融科技在数据、智能、效率、安全四大领域的核心技术与应用发展趋势&#xff0c;并对VR、量子计算等前沿技术对金融…

深度解读畅捷通云原生架构转型实战历程

简介&#xff1a; 畅捷通公司是用友集团旗下的成员企业&#xff0c;专注于服务国内小微企业的财务和管理服务。一方面&#xff0c;畅捷通将自己的产品、业务、技术架构互联网化&#xff1b;另一方面&#xff0c;畅捷通推出了畅捷通一站式云服务平台&#xff0c;面向小微企业提供…

万兴剪刀手去水印教程_万兴神剪手怎么去水印 去除logo水印方法

万兴神剪手默认情况下得到的视频是带有水印的呢&#xff0c;相信很多人都想要知道去除掉这个水印吧&#xff0c;要是不知道怎么操作的话可以看看下面的教程。类别&#xff1a;视频处理 大小&#xff1a;188.74M 语言&#xff1a;简体中文评分&#xff1a;61、首先点击其中…

Apache Dubbo 3.0.0 正式发布 - 全面拥抱云原生

简介&#xff1a; 一个新的里程碑&#xff01; 一、背景 自从 Apache Dubbo 在 2011 年开源以来&#xff0c;在一众大规模互联网、IT公司的实践中积累了大量经验后&#xff0c;Dubbo 凭借对 Java 用户友好、功能丰富、治理能力强等优点在过去取得了很大的成功&#xff0c;成为…

cad2006安装未找到html文件,启动 AutoCAD 时显示“加载自定义文件失败。未找到文件”...

解决方案&#xff1a;执行以下一个或多个操作&#xff1a;查找安装的 ARG 文件是否来自 AutoCAD 工具组合ARG 文件无法从 AutoCAD 工具组合(如 AutoCAD Architecture、AutoCAD Mechanical 等)输入到 AutoCAD 或 AutoCAD LT。要查找 ARG 文件是否来自 AutoCAD 工具组合&#xff…

python关键字中文意思_python 字符串只保留汉字的方法

如下所示&#xff1a; def is_chinese(uchar): """判断一个unicode是否是汉字""" if uchar > u\u4e00 and uchar < u\u9fa5: return True else: return False def is_number(uchar): """判断一个unicode是否是数字"&q…

启明星辰集团DT总部落地杭州 数据绿洲版图驱动未来发展

12月1日&#xff0c;启明星辰集团DT&#xff08;数据时代&#xff09;总部正式落地于杭州高新区&#xff08;滨江&#xff09;&#xff0c;与北京IT总部形成南北两个总部基地新格局&#xff0c;并发布数据安全新版图--数据绿洲&#xff0c;将结合杭州领先的数字应用的场景&…

Quick BI的可视分析之路

简介&#xff1a; Quick BI是专为云上用户量身打造的智能数据分析和可视化BI产品&#xff0c;帮助企业快速完成从传统的数据分析到数据云化分析云化的转变&#xff0c;将企业的业务数据产出后以最快的速度被推送到各组织侧消费使用。本篇着重介绍Quick BI在可视化分析上的能力与…

火山引擎进军云市场,计划未来三年服务十万客户

12月2日&#xff0c;火山引擎在升级为字节跳动企业级技术服务业务板块之后&#xff0c;首次亮相就正式发布全系云产品&#xff0c;包括云基础、视频及内容分发、数据中台、开发中台、人工智能等五大类、共计78项服务。 火山引擎云产品是字节跳动“敏捷开发”技术实践的对外输出…

geth 转账_eth客户端安装 geth使用 批量转账(一)

这里是第一篇&#xff0c;主要讲eth客户端安装eth官网 https://ethereum.org/国内有一个论坛内容挺多的&#xff0c;可以参考 http://ethfans.org/eth客户端&#xff1a;eth客户端种类很多&#xff0c;go&#xff0c;Java&#xff0c;Python等各种语言写的都有目前最常用的&am…