架构
概念
nsqlookup:存储了nsqd的元数据和服务信息(endpoind),向消费者提供服务发现功能, 向nsqadmin提供数据查询功能。
nsqd: 是接收、队列和传送消息到客户端的守护进程。
nsqadmin:简单的管理界面,展示了topic, channel以及channel上的消费者,也可以创建topic,channel。
消息可靠性
(1)生产者不保证消息可靠
(2)消费者保证至少一次消费
发送逻辑
(1)根据配置指定的nsqd的ip, 选择一个机器,通过 HTTP API(也可以TCP)将消息发布到 nsqd的指定 topic
(2)当 producer初次发布的消息的 topic不存在,则会创建。
(3)对topic加锁,将消息发送给 memoryMsgChan中,然后释放锁。如果 memoryMsgChan满了,申请一个buff,把消息写到 Backend,后期被 backendMsgChan接收。
(4)messagePump 不断从 memoryChan/backend队列中读消息,并将消息每个复制一遍,发送给 topic下的所有channel
//nsqd/topic.go:220
func (t *Topic) messagePump() {...for {select {case msg = <-memoryMsgChan:case buf = <-backendChan:msg, err = decodeMessage(buf)...case <-t.channelUpdateChan:...case pause := <-t.pauseChan:...case <-t.exitChan:goto exit}for i, channel := range chans {chanMsg := msgif i > 0 {chanMsg = NewMessage(msg.ID, msg.Body)chanMsg.Timestamp = msg.TimestampchanMsg.deferred = msg.deferred}if chanMsg.deferred != 0 {channel.PutMessageDeferred(chanMsg, chanMsg.deferred)continue}err := channel.PutMessage(chanMsg)...}}
}
(5)channel的PutMessage和 topic类似,首先先写 memoryMsgChan, 满了写入 backend.
(6)protocol实例的messagePump方法从memoryMsgChan或backendMsgChan读取消息并通过p.SendMessage(client, msg)发送到客户端 ,消息写入client.Writer。
消费逻辑
(1)consumer将会从 nsqlookup 服务器节点上发现所有包含事件 topic的 nsqd节点。每个consumer向每个 nsqd主机进行订阅操作。
(2)根据获取到的机器信息,通过 TCPsubscribe 自己需要的channel。如果 topic 或者 channel没有创建,则会创建
(3)多个 consumer对应一个 channel, 每个消息将被传递到一个随机的 consumer中。
消费特点:
(1)支持延时消息。
(2)channel在consumer退出后并不会删除。
消息消费失败
这个担保是作为协议和工作流的一部分,工作原理如下(假设客户端成功连接并订阅一个话题):
1)客户表示已经准备好接收消息
2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout,采用大小堆【超时时间排序】和map存储)
3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息 (有个协程定时查看,根据当前时间跟最大堆的顶元素比较)。
消费者限流
就是客户端连接上nsqd之后,会告诉nsqd它的可接受的消息数量是多少,每当nsqd给客户端推送一条消息这个RDY就会减一,而客户端消费完毕并且发送一个FIN之后,这个RDY又会加一(其实这个设计有点类似tcp中的用来控制流量的窗口机制)。
客户端库的被设计成在 RDY 数达到配置 max-in-flight 的 25% 发送一个命令来更新 RDY 计数(并适当考虑连接到多个 nsqd 情况下,适当地分配)。
本地初始化consumer时配置 maxInFlight, 在服务端也配置一个 max_in_flight。每个连接默认:2500
客户端的职责:
-
引导并均匀地将max_in_flight配置分配给所有连接。
-
绝不允许所有连接的RDY计数总和(total_rdy_count)超过所配置的max_in_flight。
-
永远不要超过nsqd配置的每个连接的max_rdy_count。
-
公开API方法以可靠地指示消息流的不足(message flow starvation)。
客户端库应始终尝试在所有连接之间平均分配RDY计数。通常,此实现为max_in_flight / num_conns。
特性:
1.采用push机制,保证至少一次推送
性能:
单个节点性能
声明:请牢记 NSQ 设计的初衷是分布式。单个节点的性能非常重要,但这并不是我们所追求的。
-
2012 MacBook Air i7 2ghz
-
go1.2
-
NSQ v0.2.24
-
200 byte messages
GOMAXPROCS=1 (单个生产者,单个消费者)
$ ./bench.sh results... PUB: 2014/01/12 22:09:08 duration: 2.311925588s - 82.500mb/s - 432539.873ops/s - 2.312us/op SUB: 2014/01/12 22:09:19 duration: 6.009749983s - 31.738mb/s - 166396.273ops/s - 6.010us/op
GOMAXPROCS=4 (4 publishers, 4 consumers)
$ ./bench.sh results... PUB: 2014/01/13 16:58:05 duration: 1.411492441s - 135.130mb/s - 708469.965ops/s - 1.411us/op SUB: 2014/01/13 16:58:16 duration: 5.251380583s - 36.321mb/s - 190426.114ops/s - 5.251us/op
引用:
分布式实时消息平台NSQ - 知乎
https://juejin.cn/post/6932865148784902158
http://nsqio.cn/design.html