快半年没更新了.......
因为工作上面需要用到NSQ消息队列对业务做解耦,所以对NSQ做了一些简单的研究。这篇文章主要结合源码做一些原理性的分析。不了解的NSQ的同学可以先读官方文档。
我们从使用方式开始入手,并逐步深挖。
//首先定义一个实现HandleMessage方法的结构体
type myMessageHandler struct {}
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
//处理消息的业务逻辑
err := processMessage(m.Body)
//返回结果,这里如果err不为空,该消息会被重发
return err
}
func main() {
config := nsq.NewConfig()
consumer, err := nsq.NewConsumer("topic", "channel", config)
if err != nil {
log.Fatal(err)
}
consumer.AddHandler(&myMessageHandler{})
// 连接nsqloopup
err = consumer.ConnectToNSQLookupd("localhost:4161")
if err != nil {
log.Fatal(err)
}
......
}
客户端Consumer首先去连接nsqlookup,通过http请求得到nsqd的地址,并最终对得到的nsqd地址进行tcp连接。并将该连接信息放到自身的connections中。
type Consumer struct {
......
topic string
channel string
.....
//用于消息传递的管道
incomingMessages chan *Message
//保存与nsqd的连接信息
pendingConnections map[string]*Conn
connections map[string]*Conn
......
}
在请求NSQD时,会先与NSQD建立一条TCP连接,并启动两个goroutine来服务这条连接,分别是readLoop和writeLoop。
readLoop
readLoop用于读取NSQD发来的消息,包括消息Message,心跳等,并调用Consumer的OnMessage方法去处理消息。
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
//从connection中读取消息
frameType, data, err := ReadUnpackedResponse(c)
if err != nil {
......
}
//判断是否是心跳消息
if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
......
continue
}
//正常的消息是FrameTypeMessage是类型
switch frameType {
case FrameTypeResponse:
c.delegate.OnResponse(c, data)
case FrameTypeMessage:
// 解码数据得到消息对象,因为消息是按照NSQ自定义的一套序列化规则来的,
//所以这里进行解码,并赋值到消息体对象
msg, err := DecodeMessage(data)
......
//一些赋值操作过后,增加处理的消息数量
atomic.AddInt64(&c.messagesInFlight, 1)
//最终调用Consumer的OnMessage方法处理消息
c.delegate.OnMessage(c, msg)
case FrameTypeError:
......
default:
c.log(LogLevelError, "IO error - %s", err)
c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
}
}
......
}
consumer的OnMessage方法,就是将message塞到了incomingMessages管道中。
func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message){
d.r.onConnMessage(c, m)
}
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
atomic.AddUint64(&r.messagesReceived, 1)
r.incomingMessages <- msg
}
那这个incomingMessages管道是谁来消费呢
在我们使用的时候会调用AddHandler方法,就会开启一个handlerLoop goruntine去对incomingMessages管道进行消费,拿到消息,并调用我们自己的handler.HandleMessage方法处理业务。
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
//是否应该放弃这条消息(因为有的消息可能消费失败重试了很多次)
if r.shouldFailMessage(message, handler) {
//如果放弃,就发送finish信号FIN
message.Finish()
continue
}
//调用HandleMessage方法处理业务,如果返回的错误不为空,
//说明这条消息消费失败了,根据业务需求决定是否要重新消费一次
err := handler.HandleMessage(message)
if err != nil {
//自动回复是否被禁用,默认情况下为false
if !message.IsAutoResponseDisabled() {
//会调用Requeue方法实现重新入队的操作
message.Requeue(-1)
}
continue
}
//没有错误产生情况下,再去调用Finish方法发送FIN信号给到nsqd
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
}
Requeue,Finish这类方法会包装一个Command类型的结构体指令,会将该指令返回nsqd,代表这条消息的处理状况,nsqd端会对该指令做区分并调用不同方式处理。
type Command struct {
Name []byte
Params [][]byte
Body []byte
}
func Finish(id MessageID) *Command {
var params = [][]byte{id[:]}
return &Command{[]byte("FIN"), params, nil}
}
func Requeue(id MessageID, delay time.Duration) *Command {
var params = [][]byte{id[:], []byte(strconv.Itoa(int(delay / time.Millisecond)))}
return &Command{[]byte("REQ"), params, nil}
}
writeLoop
上面说到的message对象对应的一系列回复nsqd的方法(Requeue,Finish),都是将信息发送到了Consumer的connection对象中的msgResponseChan管道。而writeLoop goroutine会读取该管道,并将信息发回到nsqd。
func (c *Conn) writeLoop() {
for {
select {
......
case resp := <-c.msgResponseChan:
//consumer端信息处理结束,正在处理的信息数减一
msgsInFlight := atomic.AddInt64(&c.messagesInFlight, -1)
//一些标记的操作
if resp.success {
......
} else {
......
}
//将指令写回nsqd
err := c.WriteCommand(resp.cmd)
if err != nil {
c.log(LogLevelError, "error sending command %s - %s", resp.cmd, err)
c.close()
continue
}
......
}
}
}
上面的流程是consumer客户端的消费逻辑,接下来介绍下nsqd服务端的消息处理逻辑。
nsq中的'channel'类似于消费者组,但是channel是topic下一级的概念,channel不能订阅多个topic。每个topic可能被多个channel订阅,每个channel可能被多个consumer消费,类似下图。
当topic创建时候,会初始化一个Topic对象。里面包含一个 chan类型的memoryMsgChan,用于临时存放producer发来的消息,由于chan是有限度的,所以当发生消息积压打到chan的容量时候,后续的发来的消息会被持久化,BackendQueue提供了持久化的接口。
在初始化topic的时候,也会创建一个异步messagePump goroutine去服务这个topic,比如会从memoryMsgChan管道中拿消息分发到下游channel中。
type Topic struct {
......
name string
//存放多个channel
channelMap map[string]*Channel
//用于做磁盘持久化的接口
backend BackendQueue
//用于存放消息的管道
memoryMsgChan chan *Message
//channel的状态发生改变的通知从这里获取
channelUpdateChan chan int
waitGroup util.WaitGroupWrapper
exitFlag int32
idFactory *guidFactory
ephemeral bool
deleteCallback func(*Topic)
deleter sync.Once
paused int32
pauseChan chan int
//nsqd全局对象
nsqd *NSQD
}
接收到消息时,会调用Topic的PutMessage->put方法,该方法将message发送到memoryMsgChan管道,如果阻塞,调用writeMessageToBackend方法持久化(这里的backend对象是使用的一个开源三方库go-diskqueue)。
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, t.backend)
t.nsqd.SetHealth(err)
if err != nil {
return err
}
}
return nil
}
messagePump会从memoryMsgChan和backend中去读取消息(backend实现了ReadChan方法,可以读到存取的数据)。
读取到的数据会分发到该topic下面的channel中。
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
//topic旗下的channel
var chans []*Channel
//两个读取消息的管道
var memoryMsgChan chan *Message
var backendChan <-chan []byte
......
//有可能出现topic创建了,但是channel还没创建的情况(也就是消费者
//还没有建立的情况,当channel创建好,会往channelUpdateChan发信号),
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
for {
select {
//从memoryMsgChan中收到消息
case msg = <-memoryMsgChan:
//从backend中收到消息
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
//当收到信号,代表消费者已经建立好了,会对memoryMsgChan和backendChan
//进行重新赋值,会将Topic里面的对象赋值过来。
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
//将channel对象追加到chans中
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
//对两个管道进行重新赋值
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
......
}
//分别对每个channel去发送消息
for i, channel := range chans {
chanMsg := msg
//确保每条消息在每个管道唯一性的操作
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
//查看该消息是否要延迟发送
if chanMsg.deferred != 0 {
//如果要延迟发送,放到channel的延迟队列中
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
//调用PutMessage发送
err := channel.PutMessage(chanMsg)
......
}
}
}
channel是跟消费者Consumer去对接的,本身也需要一些能保存消息的数据结构,以确保消息的不丢失。在初始化时候会初始化一些存放消息的结构,比如带缓冲区的memoryMsgChan chan,也有一个backend用于持久化消息的组件,因为需要预防自身的memoryMsgChan满了的情况。
由于和消费者Consumer联系较为紧密,所以需要确保消息提供的可靠性,也就是消息从发送到Consumer和Consumer返回消息响应这个时间段消息不丢失,需要有一个数据结构来对消息做标记,这就是InFlight机制。
type Channel struct {
//自身的名称等参数
//.....
nsqd *NSQD
//channel用于持久化消息的管道
backend BackendQueue
//Channel用于存放消息的管道
memoryMsgChan chan *Message
exitFlag int32
exitMutex sync.RWMutex
......
// 延迟消息的存放位置
deferredMessages map[MessageID]*pqueue.Item
deferredPQ pqueue.PriorityQueue
deferredMutex sync.Mutex
//用于标记消息的数据结构
inFlightMessages map[MessageID]*Message
inFlightPQ inFlightPqueue
inFlightMutex sync.Mutex
}
在channel发送消息调用PutMessage方法时,会调用自身的put方法,这里和Topic的方法一样.
func (c *Channel) put(m *Message) error {
select {
//发送到自身的memoryMsgChan,否则进行持久化
case c.memoryMsgChan <- m:
default:
err := writeMessageToBackend(m, c.backend)
c.nsqd.SetHealth(err)
......
}
return nil
}
这个时候的memoryMsgChan的消费端就是消费者Consumer创建的连接了。
对于每一个Consumer消费者,nsqd会创建一个Handle goroutine去服务它。该goroutine会执行IOLoop方法,会异步启动一个messagePump goroutine,该协程从Channel中接收消息,并转发到Consumer。IOLoop也会读取Consumer发来的消息,并作出对应的处理。
当和Consumer建立连接时候,首先会收到一条SUB指令,来确定Consumer指定的topic和channel。并将该的Channel对象传到messagePump goroutine中,用于消费channel中消息。
func (p *protocolV2) IOLoop(c protocol.Client) error {
var err error
var line []byte
var zeroTime time.Time
client := c.(*clientV2)
messagePumpStartedChan := make(chan bool)
//异步启动messagePump处理channel中的消息
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
//接收Consumer发来的指令
for {
......
line, err = client.Reader.ReadSlice('\n')
if err != nil {
......
break
}
//解析指令(FIN/REQ/SUB....)
line = line[:len(line)-1]
......
params := bytes.Split(line, separatorBytes)
//应用指令
response, err = p.Exec(client, params)
if err != nil {
......
continue
}
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
return err
}
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
......
for {
......
select {
......
//发送心跳信息给Consumer
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
......
//发送磁盘持久化的信息给Consumer
case b := <-backendMsgChan:
...解析一下信息
msg, err := decodeMessage(b)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
//该信息即将发送,需要在channel的InFlight中保存一份。
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
//发送消息给Consumer
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
//发送管道中的信息给Consumer
case msg := <-memoryMsgChan:
......
//该信息即将发送,也需要在channel的InFlight中保存一份。
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
......
}
}
}
当nsqd收到Consumer传来的FIN指令时,代表该消息已经处理结束,不需要继续保存,此时可以从Inflight中移除该消息。
对于重新入队的消息,会首先从InFlight中删除该消息,并将消息以及下一次消息发送的时间存到channel的deferredMessages延时发送结构中,等待到时间将消息重新发送到memoryMsgChan管道或者backend中,实现重新发送的目的。
当然还有很多指令,这里不一一列举。
上文讲到Channel中有一个延迟发送的数据结构deferredMessages。它是怎样延迟发送的呢?
在nsqd启动时候,后台会异步启动一个queueScanLoop协程。它通过定时器定期去检查进程中关于channel的变化,并调用resizePool对所有的channel作出对应的调整(清理各个channel中超时的消息和检查延迟队列中的消息)。
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
//workCh中放着所有的channel,遍历所有的channel并作处理
for {
select {
//从workCh中取出channel
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
//检查inflight中超时的消息,在nsqd发送消息出去时候,
//会设置超时时间,如果没返回FIN等信号,认为超时,重新发送该消息
if c.processInFlightQueue(now) {
dirty = true
}
//检查延迟队列中的消息是否到了要发送的时间并进行发送
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
......
}
}
}
上面提到过,在Channel中关于延迟队列和inflight队列是分别用两个结构去存储的,一个是map结构,用于标记该消息,确保唯一性;另一个是小顶堆实现的,通过时间对消息进行排序组织,来达到取出超时或者到时间的消息的目的。
type Channel struct {
//延迟消息的存储结构,用于标记消息
deferredMessages map[MessageID]*pqueue.Item
//最小堆实现的队列,用于对消息通过时间进行排序
deferredPQ pqueue.PriorityQueue
//发送中的消息的存储结构
inFlightMessages map[MessageID]*Message
//最小堆实现的队列,用于对消息通过时间进行排序
inFlightPQ inFlightPqueue
}
func (c *Channel) processDeferredQueue(t int64) bool {
......
dirty := false
for {
c.deferredMutex.Lock()
//检查堆顶是否有超时的消息,有就去掉
item, _ := c.deferredPQ.PeekAndShift(t)
c.deferredMutex.Unlock()
//当取出的元素为空即没有超时消息,跳出循环。
if item == nil {
goto exit
}
dirty = true
msg := item.Value.(*Message)
//删掉deferredMessages中的消息
_, err := c.popDeferredMessage(msg.ID)
if err != nil {
goto exit
}
//调用put方法将消息放到
c.put(msg)
}
exit:
return dirty
}
nsq consumer端通过readLoop和writeLoop来对消息流转,实现了一套协议来请求与解析。
nsq通过Inflight机制来实现消息的不丢失,实现了'at least once'的功能,如果需要实现'exactly once',则需要业务端去做配合过滤。
nsqd通过轮询检查和小顶堆的方式来实现延时队列,重新入队REQ操作就使用了延时队列。
nsq无法保证消息的有序性,因为通过chan管道和磁盘来对消息做缓存,无法保证消息消费的先后。
消费者无法消费到历史的消息,因为消息是实时的增删的,没有类似kafka offset的概念。
NSQ源码 https://github.com/nsqio/nsq
NSQ inflight机制 https://swanspouse.github.io/2018/12/10/nsq-message-in-flight/