前几天做了一次技术分享,写篇文章总结下。
不知道大家有没有这种感觉,在做ppt汇报,或者一些分享时候,准备的如果用100来打比方,最终演讲出来的效果可能不到80,导致的因素一方面是紧张导致没说清楚,一方面是讲着讲着忘记了。
当然,作为听众,可能听到的效果不足30,因为如果内容比较枯燥,到后半程真正会去听的也很少;或者中途走神了,再去听后面的已经衔接不上了,只能继续摆烂。所以,提前抛出一些有意思的话题,可以尽可能让听者多听一会。
后面的资料补充也很重要,作为分享知识的补充,便于听者在没有听清楚时候后面翻看,虽然基本也很少有人会去翻看。
不过做一场技术分享,是一件使技术人的技术情怀得到满足的事情。
作为一个技术人,多少还是有一些技术情怀的,主要体现在这些方面:
完成一个模块,对自己的代码沾沾自喜一阵,虽然过几天就看不懂了
看一些开源项目,提个issue,pr,利用开源项目里面的一些做法解决了一些业务上的难题,没白看
帮助别人解决技术问题
写一些技术文章总结,做技术分享
说是技术情怀,也是技术人的成就感。
本文只粘贴了部分我觉得最重要的内容的ppt页,偏向工作原理性质的东西,通过该文章,可以了解到一款最简单的消息队列是怎么实现的,该文章不像以往文章大段源码,读起来比较轻松,希望为读者工作中的某些问题提供一些思路。
客户端的sdk提供了publish等方法,当客户端调用Publish等发送消息时候。
客户端与nsqd会建立tcp连接,这里是懒加载的模式,建立连接的同时,客户端会启动routerloop,readloop等任务
nsqd里面的tcpserver会启动一个IOLoop来服务这条tcp连接,
一切就绪了,消息会被封装成一个transaction,这个transaction包含消息体和一个chan,并等待这个chan,chan返回则消息发送成功,这个transaction被放到transactionChan管道里,
管道会被routerloop消费,
进而将这个transaction append到transactions数组里面,
之后会将消息体以及指令发送到nsqd,nsqd做处理。
nsqd处理后返回ok,
被readloop读取后发往router,中间有一个chan,图里没体现,
router会将消息对应的transaction从切片transactions里面pop出来,并把transaction中的chan释放。客户端那边的方法就会返回。
这里对transactions切片的操作是没有加锁的,因为消息的发送与结束是在一个协程中处理的,是顺序的发送。
先看下Topic的数据结构
channelMap: topic下channel集合;
backend: 磁盘队列的interface,go-diskqueue实现,Put方法写消息,ReadChan返回一个chan,用于读消息;
memoryMsgChan: 内存消息队列,chan类型,容量默认10000
在每一个topic被创建的时候,都会启动一个对应的messagePump协程,负责消费topic下的memoryMsgChan和backend中的message,并把message分发给该topic每个Channel,调用channel的PutMessage方法。
看一下channel的数据结构,主要比topic多了四个结构
channel的PutMessage方法实现与Topic的一致,也是先写chan,满了就写backend;区别是后面消费管道的消费者不同,channel间接的对接了消费者。
consumer与nsqd建立tcp连接
Nsqd会为consumer开启一个IOLoop,启动一个messagePump协程服务;consumer这边,会启动readloop,writeloop也来服务这条连接
consumer发送一个SUB指令,指定自己要消费的topic,channel
该topic与channel会被nsqd找到
Consumer向nsqd发送rdy指令,代表自己已经就绪,nsqd可以推数据了
此时该messagePump会消费channel对应的memoryMsgChan和backend中的数据
将消息发送到consumer,被consumer的readloop获取到
Readloop把消息放到incomingMessages信道里面
会被业务协程获取到,并对消息进行处理
业务协程返回后,会对该消息封装成Finish等方法
发送回到nsqd,代表该消息已经消费
可以同时启动多个消费者对同一个channel做消费,增强消费能力,nsqd端对消费者的负载均衡处理方式也很简单。多个messagePump同时去监听channel下的两种消息管道。
NSQ在消息的传递过程中遵循的是at least once,所以在某些时候会出现消息重复的情况;下面是消息在传递给消费者过程中的流程。
消息正常处理情况:
Consumer对应的messagePump获取到一条消息
计算好消息的超时时间,将消息标记到channel的inflightMessages中,这是一个map结构,用于通过ID做标记;将消息放到InflightPQ中,这是一个最小堆,最靠近当前时间的消息会放在堆顶
将消息发送到consumer,进行处理
在规定时间内处理完成,发送Fin信号,被consumer对应的IOLoop接收到
执行FIN指令,分别从map和最小堆中去掉该消息。
消息超时重传情况,步骤1-3同上:
消息消费超时,可能是系统卡住等导致的一些情况
在nsqd启动时候,会启动一个queueScanLoop协程池,会定期去每个channel下面的inflightPQ最小堆检查有没有消息超时,方法就是比较堆顶的元素
如果超时,就会把消息pop出来,并从map中删掉
将消息重新投递到channel里面,重新消费
有时候想要对一条消息进行重新消费,就可以通过REQ机制完成:
当客户端想要将一些消息做重新消费的时候,可以通过sdk发送一条REQ的指令
会将消息从inflight的最小堆和map中双双去掉
将消息放到deferedmessage map中,并计算好下次发送的时间放到defererPQ最小堆中
启动的queueScanLoop协程池中的ququeScanWorker会对延迟的消息进行检查
一旦时间到了,就会取出消息,放到channel中去重新消费
大致讲讲MaxInFlight的原理,其中有很多细节没有涉及:
并发处理消息的能力指的就是消费者一个时间点,能同时接收到多少条消息,通过这个参数可以控制nsq推送消息的速率。
假设设置MaxInFlight设置为M,consumer能连接nsqd节点数量为N,分配给每一个nsqd节点的数值就是M/N,如果值小于1,则置为1
会给每个nsqd节点发送RDY指令时候,携带上这个M/N,此时,nsqd节点就知道该consumer最多一次接收多少了,设置成rdyCount
当发送消息之前,会对比一下自己的inflight参数,如果小于rdyCount(M/N),就发送,并自增这个inflight,大于就不发送,等待通知(select default尝试通知)
当消息结束时候,会自减inflifht,并通过channel尝试通知到3步骤。实现一个类似滑动窗口的作用
NSQ牺牲了一些副本机制,实现了高效与简单。代码还是很通俗易懂,感兴趣的同学可以抽个时间研究一下。