nsq code analysis
nsq code analysis
启动和退出
启动, 利用svc框架
nsq使用了svc框架来启动一个service, Run 时, 分别调用prg 实现的 Init 和 Start 方法 启动’program’,然后监听 后两个参数的信号量, 当信号量到达, 调用 prg 实现的 Stop 方法来退出
func main() {
prg := &program{}
// svc 框架的Run 方法启动一个service
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
// svc 框架, 启动 service, 也可以理解为 daemon
//
// 方法会一直阻塞, 直到 指定的信号量到来
// 如果指定信号为空, 默认是 syscall.SIGINT and syscall.SIGTERM
func Run(service Service, sig ...os.Signal) error {
env := environment{}
//先调用Init 初始化
if err := service.Init(env); err != nil {
return err
}
//再调用Start
if err := service.Start(); err != nil {
return err
}
//准备信号量处理
if len(sig) == 0 {
sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
signalChan := make(chan os.Signal, 1)
signalNotify(signalChan, sig...)
//整个程序会阻塞在这里, 等待系统信号量
<-signalChan
// 当信号来到, 调用stop 方法停掉
return service.Stop()
}
svc 生命周期三个方法的实现:
func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}
func (p *program) Start() error {
opts := nsqd.NewOptions()
flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
rand.Seed(time.Now().UTC().UnixNano())
if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Println(version.String("nsqd"))
os.Exit(0)
}
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
_, err := toml.DecodeFile(configFile, &cfg)
if err != nil {
log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
}
}
cfg.Validate()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)
err := nsqd.LoadMetadata()
if err != nil {
log.Fatalf("ERROR: %s", err.Error())
}
err = nsqd.PersistMetadata()
if err != nil {
log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
p.nsqd = nsqd
return nil
}
func (p *program) Stop() error {
if p.nsqd != nil {
p.nsqd.Exit()
}
return nil
}
优雅退出
NSQd 的真正的启动函数是Main
func (n *NSQD) Main() {
var httpListener net.Listener
var httpsListener net.Listener
ctx := &context{n}
tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
os.Exit(1)
}
n.Lock()
n.tcpListener = tcpListener
n.Unlock()
tcpServer := &tcpServer{ctx: ctx}
n.waitGroup.Wrap(func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
os.Exit(1)
}
n.Lock()
n.httpsListener = httpsListener
n.Unlock()
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
})
}
httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
if err != nil {
n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
os.Exit(1)
}
n.Lock()
n.httpListener = httpListener
n.Unlock()
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
})
n.waitGroup.Wrap(func() { n.queueScanLoop() })
n.waitGroup.Wrap(func() { n.lookupLoop() })
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(func() { n.statsdLoop() })
}
}
现在我们就有了多个线程,比如:
- Main线程, 启动两个服务进程后阻塞等待系统信号量
- tcp服务线程, 循环调用 listener.Accept() 接收连接请求
- queueScanLoop, runs in a single goroutine to process in-flight and deferred priority queues
假如Main线程在接收到系统信号量之后直接退出, 那么其他两个线程也会跟着销毁. 于是这里就会出现一个问题, tcp服务线程 和 http服务线程, 是业务逻辑实现的主要线程, 里面可能正在”干活”, 此时需要退出, 是否有数据需要持久化了? 事务是否正好执行到一半?
所以退出时需要做优雅退出处理. Main所在线程退出之前, 需要 ‘通知’ 并且 ‘等待’ 两个服务线程退出后才能退出
为了实现这一点, nsq 利用了 sync包中的 waitgroup 组件
nsq 为了方便使用, 做了一个封装:
type WaitGroupWrapper struct {
sync.WaitGroup
}
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1) // +1
// 起一个线程, 就是go 的 goroutine
go func() {
cb() //执行任务
w.Done() // -1
}()
}
那么比如nsq 启动 tcp 服务线程 的代码, 就相当于:
w.Add(1)
go func() {
protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
w.Done()
}()
w.Wait() //Main 线程就会阻塞在这里; 等待TCPServer 结束时调用w.Done()后才退出
通知 goroutine 退出
好了, 现在Main 会等待TCPServer 退出之后再退出了, 那, Main要退出的时候, 如何通知 TCPServer 退出?
protocol.TCPServer 就是循环的监听 tcp listener的 accept 连接, 交给 handler 处理; 然后这个for 循环是一个死循环, 依靠 accept 返回 error 才退出
// ==================== 注意, 这里的 TCPServer 运行在一个新的 goroutine 中
// 这个TCPServer 也是一个可以复用的比较不错的 tcp 程序 accept 模式
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
runtime.Gosched() // 是临时的错误, 暂停一下继续
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
}
break
}
go handler.Handle(clientConn)
}
l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}
再看Exit函数:
func (n *NSQD) Exit() {
if n.tcpListener != nil {
n.tcpListener.Close()
}
if n.httpListener != nil {
n.httpListener.Close()
}
if n.httpsListener != nil {
n.httpsListener.Close()
}
n.Lock()
err := n.PersistMetadata()
if err != nil {
n.logf("ERROR: failed to persist metadata - %s", err)
}
n.logf("NSQ: closing topics")
for _, topic := range n.topicMap {
topic.Close()
}
n.Unlock()
close(n.exitChan)
n.waitGroup.Wait()
n.dl.Unlock()
}
这里信号量到达之后, 做了一个 tcpListener.Close(), 其实这个动作的目的是通知 tcpListener 服务所在的线程退出
这里直接利用了 listener 被 close 之后再 accept 调用会返回error的特性, 既退出了线程, 又关闭了 listener, 一举两得
消息处理流
协议不同版本 的 封装处理
接到连接之后, 交给TCPHandler 处理, 这里是由tcpServer 实现:
func (p *tcpServer) Handle(clientConn net.Conn) {
p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())
// 先读取4个字节 作为 协议版本号
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
return
}
protocolMagic := string(buf)
p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
// protocolV2 实现了 V2 这个版本的协议
prot = &protocolV2{ctx: p.ctx}
//假如有另外一个版本的协议
//case " V3":
// prot = &protocolV3{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
//交给具体protocol实现类处理每个连接
err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
return
}
}
Handle 函数先读取4个字节的字符串作为版本号, 根据版本号选用具体的 protocol 实现, 这里的 “ V2”这个版本的协议由protocolV2 实现
将协议封装, 方便以后不同协议的扩展
type Protocol interface {
IOLoop(conn net.Conn) error
}
具体 每个’命令’处理的封装
前面看到, listener.Accept() 之后, 解析了版本号, 当前由 protocol_v2 具体实现了 IOLoop
func (p *protocolV2) IOLoop(conn net.Conn) error {
var err error
var line []byte
var zeroTime time.Time
// nsqd 范围的 clientID 序列
clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.ctx)
// 这个messagePump 名字很形象, 把要发送给client 的messgae 从 缓存池子里Pump 抽出来 做具体的发送
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan //TODO messagePump 里要做初始化, 具体是什么? 为什么需要这样阻塞一下IOLoop ?
// 底下这个地方是 处理 client 的请求的
for {
// 如果 client 设置需要做心跳检查, 则设置 读超时为 两倍 心跳检查间隔
// 也就是说, 在这个时间里, 正常情况下, client肯定会在这个间隔内发一个心跳包过来
if client.HeartbeatInterval > 0 {
client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
} else {
// 加入client 不设置做心跳则不设置读超时
client.SetReadDeadline(zeroTime)
}
// ReadSlice does not allocate new space for the data each request
// ie. the returned slice is only valid until the next call to it
line, err = client.Reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
err = nil
} else {
err = fmt.Errorf("failed to read command - %s", err)
}
break
}
// 这个V2 版本的协议, 一行是一个命令
line = line[:len(line)-1]
// optionally trim the '\r' , 处理一下\r, 有可能win版本的回车是 \r\n
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
// 每个命令, 用 " "空格来划分 参数
params := bytes.Split(line, separatorBytes)
if p.ctx.nsqd.getOpts().Verbose {
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params)
}
// 执行命令
var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx)
sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
break
}
// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}
// 加入命令是有 '响应' 的, 发送响应
if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}
}
p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client)
conn.Close()
// 通知 messagePump 退出
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
return err
}
func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
// 因为client 的处理 还有一个 messagePump 线程, 所以 发送要锁
client.writeLock.Lock()
var zeroTime time.Time
if client.HeartbeatInterval > 0 {
client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval))
} else {
client.SetWriteDeadline(zeroTime)
}
//V2 协议版本 发送给client, 是 使用 [(4byte)消息长度 , (4byte)消息类型, (载体)] 的 帧格式
_, err := protocol.SendFramedResponse(client.Writer, frameType, data)
if err != nil {
client.writeLock.Unlock()
return err
}
if frameType != frameTypeMessage {
err = client.Flush()
}
client.writeLock.Unlock()
return err
}
func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) {
beBuf := make([]byte, 4)
size := uint32(len(data)) + 4
binary.BigEndian.PutUint32(beBuf, size)
n, err := w.Write(beBuf)
if err != nil {
return n, err
}
binary.BigEndian.PutUint32(beBuf, uint32(frameType))
n, err = w.Write(beBuf)
if err != nil {
return n + 4, err
}
n, err = w.Write(data)
return n + 8, err
}
读取并解析请求之后, 传给 p.Exec(client, params) 执行具体的请求
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
//这个命令不需要做认证
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
// client 是否做了认证
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
//一行一个请求, 一个命令中用 " " 空格分割参数, 第一个参数是 命令标志
switch {
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("RDY")):
return p.RDY(client, params)
case bytes.Equal(params[0], []byte("REQ")):
return p.REQ(client, params)
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
case bytes.Equal(params[0], []byte("MPUB")):
return p.MPUB(client, params)
case bytes.Equal(params[0], []byte("DPUB")):
return p.DPUB(client, params)
case bytes.Equal(params[0], []byte("NOP")):
return p.NOP(client, params)
case bytes.Equal(params[0], []byte("TOUCH")):
return p.TOUCH(client, params)
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
case bytes.Equal(params[0], []byte("CLS")):
return p.CLS(client, params)
case bytes.Equal(params[0], []byte("AUTH")):
return p.AUTH(client, params)
}
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
发送接收流程
从TcpServer看到nsq是如何接收client连接, 读取请求消息, 执行并返回结果响应的流程. 看到具体的执行命令代码, 其中有一个是”PUB”命令, 也就是消息推送, 现在就来看一下, 一个消息从被一个client做PUB发送, 到SUB订阅了消息的Client接收, 在nsqd里是如何流转的
1. 客户端 层面 的 PUB 消息
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
switch {
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
}
看 PUB方法的具体实现:
//客户端推送消息
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
var err error
if len(params) < 2 {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
}
//请求格式: [ PUB topicName ...]
topicName := string(params[1])
if !protocol.IsValidTopicName(topicName) {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
fmt.Sprintf("PUB topic name %q is not valid", topicName))
}
//消息体, 在client 请求的下一行开始, 头4个字节是消息体长度
bodyLen, err := readLen(client.Reader, client.lenSlice)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
}
if bodyLen <= 0 {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
fmt.Sprintf("PUB invalid message body size %d", bodyLen))
}
//长度是否过大
if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {
return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))
}
//建立缓冲区并读入
messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
}
//client 是否有权限对这个 topic 做 PUB 操作
if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
return nil, err
}
topic := p.ctx.nsqd.GetTopic(topicName)
//创建 Message 对象, TODO: nsqd 的 唯一id 发生器
msg := NewMessage(topic.GenerateID(), messageBody)
//topic 发送消息
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}
return okBytes, nil
}
2. Topic 层的 PutMessage
PUB 方法做一系列检查, 然后调用 topic.PutMessage(msg) 做具体的发送
func (t *Topic) PutMessage(m *Message) error {
//TODO 这里是个 ReadLock, 应该是为了锁住 exitFlag ?
t.RLock()
defer t.RUnlock()
//这里使用了一个 atomic Int32 类型的 exitFlag 退出标志, 如果已经退出了就不在 put 了
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
//发送
err := t.put(m)
if err != nil {
return err
}
//做个计数
atomic.AddUint64(&t.messageCount, 1)
return nil
}
func (t *Topic) put(m *Message) error {
// 这里巧妙利用了 chan 的特性
// 先写入 memoryMsgChan 这个队列,假如 memoryMsgChan 已满, 不可写入
// golang 就会执行 default 语句,
select {
case t.memoryMsgChan <- m: //'变量' 不都是内存? 这个 '内存', 是相对于 下面这个 DiskQueue 来讲的)
default:
// 利用 sync.Pool 包 做了一个 bytes.Buffer 的缓存池, 从池中取出一个来用
// <<go语言的官方包sync.Pool的实现原理和适用场景>> :
// http://blog.csdn.net/yongjian_lian/article/details/42058893
b := bufferPoolGet()
// 这个 t.backend, 在NewTopic 函数中初始化, 它具体是一个 DiskQueue
//TODO 先不去管他的具体实现, 暂时认识它是把消息 写入硬盘 就好
//t.backend = newDiskQueue(topicName,
// ctx.nsqd.getOpts().DataPath,
// ctx.nsqd.getOpts().MaxBytesPerFile,
// int32(minValidMsgLength),
// int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
// ctx.nsqd.getOpts().SyncEvery,
// ctx.nsqd.getOpts().SyncTimeout,
// ctx.nsqd.getOpts().Logger)
err := writeMessageToBackend(b, m, t.backend)
// 放回缓存池
bufferPoolPut(b)
t.ctx.nsqd.SetHealth(err)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to write message to backend - %s",
t.name, err)
return err
}
}
return nil
}
Message 的数据流看代码, 到这个地方断掉了; 不过既然使用了 chan , 很明显 chan的 推和拉不在一个线程中
用IDE 对 t.memoryMsgChan 做 ‘Find Usages’ 操作, 很快定位到了 ‘<-memoryMsgChan’ 操作在 Topic 类的 messagePump 函数中
// 每个 topic 自己有一个 messagePump 消息投递 loop
func (t *Topic) messagePump() {
//避免 锁竞争, 所以缓存 已存在的 channel
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
for {
select {
case msg = <-memoryMsgChan:
case <-t.channelUpdateChan:
//之前 避免 锁竞争, 缓存了 已存在的 channel
//假如更新了 channel 会发一个 消息过来, 重新读取 channel
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
continue
}
// 这里需要 遍历 t.channelMap, 普通的写法, 会导致锁竞争太大
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
}
这个 messagePump 函数是 在 NewTopic() 也就是创建 Topic 的时候启动的一个线程
也就是说, topic 是自己 负责 把自己的 memoryMsgChan 做 Pump 出来 , 再塞到 二级 channel 的 memoryMsgChan 里
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
...
}
...
t.waitGroup.Wrap(func() { t.messagePump() })
}
3. Channel 层的 PutMessage
topic 的 put 调用 topic下含有的所有 channel 的 PutMessage;
channel 跟topic 其实是一样的, 可以这么理解: channel 就是二级topic
func (c *Channel) PutMessage(m *Message) error {
c.RLock()
defer c.RUnlock()
if atomic.LoadInt32(&c.exitFlag) == 1 {
return errors.New("exiting")
}
err := c.put(m)
if err != nil {
return err
}
atomic.AddUint64(&c.messageCount, 1)
return nil
}
func (c *Channel) put(m *Message) error {
select {
case c.memoryMsgChan <- m:
default:
b := bufferPoolGet()
err := writeMessageToBackend(b, m, c.backend)
bufferPoolPut(b)
c.ctx.nsqd.SetHealth(err)
if err != nil {
c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
4. Client 层的 messagePump
但是会发现, Channel 自己并没有 messagePump 函数, 再次通过Usage 查找, <-channel.memoryMsgChan 读取操作在 protocolV2.messagePump(), 这也就是上文中, 每个Client连接的处理函数IOLoop, 每个Client 都会有一个 protocolV2.messagePump() 线程
func (p *protocolV2) IOLoop(conn net.Conn) error {
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
line, err = client.Reader.ReadSlice('\n') // 读取
params := bytes.Split(line, separatorBytes) // 解析
response, err = p.Exec(client, params) // 执行
err = p.Send(client, frameTypeResponse, response) //发送结果
}
}
客户端的其他操作我们先忽略不管, 主要看 Message 的数据路径, 精简掉 messagePump 的代码:
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
...
for {
if subChannel == nil || !client.IsReadyForMessages() {
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
// 这里负责执行Client 的各种事件
select {
//Client 需要发送一个SUB 请求 来订阅Channel, 并切一个Client只能订阅一个Channel
case subChannel = <-subEventChan: // 做了订阅
subEventChan = nil
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
// protocol 进行消息格式的打包, 再发送给Client
// 这里, Message 就发送给了 client
err = p.SendMessage(client, msg, &buf)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
}
...
5. 整体 Message 数据流 精简总结:
总体来讲, 一个Message的推送, 经过了 两个chan 队列:
- topic.memoryMsgChan
- channel.memoryMsgChan
两个队列的两头(读, 写), 分别有一个线程进行, 也就是3个线程:
- protocol.IOLoop(clientConn)
- topic.messagePump()
- protocol.messagePump()
Message 的数据流就是:
从client连接进来, 到message 投递给Client 的所有流程代码, 精简如下:
clientConn, err := listener.Accept()
// Client ==> Client 请求响应线程 ==> topic.memoryMsgChan
go handler.Handle(clientConn) {
err = prot.IOLoop(clientConn){
for {
line, err = client.Reader.ReadSlice('\n') // <==Client
params := bytes.Split(line, separatorBytes)
response, err = p.Exec(client, params){
switch {
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params){
topic.PutMessage(msg) // ==>topic.memoryMsgChan
}
}
}
}
}
// topic.memoryMsgChan ==> Topic.messagePump 线程 ==> echa channel.memoryMsgChan
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t.waitGroup.Wrap(func() {t.messagePump(){
for {
select {
case msg = <-memoryMsgChan: // <==topic.memoryMsgChan
for i, channel := range chans {
err := channel.PutMessage(chanMsg) // ==>channel.memoryMsgChan
}
}}
})
}
// client.subChannel.memoryMsgChan ==> Client的messagePump 线程 ==> Client
func (p *protocolV2) IOLoop(conn net.Conn) error {
go p.messagePump(client, messagePumpStartedChan){
for {
select {
case subChannel = <-subEventChan:
case msg := <-subChannel.memoryMsgChan: // <==client.subChannel.memoryMsgChan
err = p.SendMessage(client, msg, &buf) // ==>Client
}
}
}
}
虽然 一个channel 会被多个Client 订阅, 从代码上来讲, 也就是 一个 channel.memoryMsgChan 会被多个 client 线程的 messagePump() 方法中进行读取
但因为chan 的特性, 一条缓存在 channel.memoryMsgChan 中的消息, 只会被一个 client 读取到
消息投递可靠性
如何保证至少成功投递一次? 在protocolV2的messagePump函数中,消息被通过投送到相应消费者。 投递时首先调用Channel的StartInFlightTimeout函数
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
select {
case msg := <-memoryMsgChan:
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg, &buf)
}
}
}
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
msg.pri = now.Add(timeout).UnixNano()
err := c.pushInFlightMessage(msg)
if err != nil {
return err
}
c.addToInFlightPQ(msg)
return nil
}
该函数填充消息的消费者ID、投送时间、优先级,然后调用pushInFlightMessage函数将消息放入inFlightMessages字典中。 最后调用addToInFlightPQ将消息放入inFlightPQ队列中。
至此,消息投递流程完成,接下来需要等待消费者对投送结果的反馈。消费者通过发送FIN、REQ、TOUCH来回复对消息的处理结果。
投递成功
消费者发送FIN,表明消息已经被接收并正确处理。
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
c.removeFromInFlightPQ(msg)
if c.e2eProcessingLatencyStream != nil {
c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
}
return nil
}
FinishMessage分别调用popInFlightMessage和removeFromInFlightPQ将消息从inFlightMessages和inFlightPQ中删除。 最后,统计该消息的投递情况。
投递失败
客户端发送REQ,表明消息投递失败,需要再次被投递。
func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
// remove from inflight first
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
c.removeFromInFlightPQ(msg)
atomic.AddUint64(&c.requeueCount, 1)
if timeout == 0 {
c.exitMutex.RLock()
if c.Exiting() {
c.exitMutex.RUnlock()
return errors.New("exiting")
}
err := c.put(msg)
c.exitMutex.RUnlock()
return err
}
// deferred requeue
return c.StartDeferredTimeout(msg, timeout)
}
func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
absTs := time.Now().Add(timeout).UnixNano()
item := &pqueue.Item{Value: msg, Priority: absTs}
err := c.pushDeferredMessage(item)
if err != nil {
return err
}
c.addToDeferredPQ(item)
return nil
}
Channel在RequeueMessage函数对消息投递失败进行处理。该函数将消息从inFlightMessages和inFlightPQ中删除, 随后进行重新投递。
发送REQ时有一个附加参数timeout,该值为0时表示立即重新投递,大于0时表示等待timeout时间之后投递。如果timeout大于0,则调用StartDeferredTimeout进行延迟投递。 首先计算延迟投递的时间点,然后调用pushDeferredMessage将消息加入deferredMessage字典, 最后将消息放入deferredPQ队列。延迟投递的消息会被专门的worker扫描并在延迟投递的时间点后进行投递。 需要注意的是,立即重新投递的消息不会进入deferredPQ队列
消息的超时和延迟投递处理
n.waitGroup.Wrap(func() { n.queueScanLoop() })
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
n.logf("QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}
该函数使用若干个worker来扫描并处理当前在投递中以及等待重新投递的消息。worker的个数由配置和当前Channel数量共同决定。 首先,初始化3个gochannel:workCh、responseCh、closeCh,分别控制worker的输入、输出和销毁。
然后获取当前的Channel集合,并且调用resizePool函数来启动指定数量的worker。
最后进入扫描的循环。在循环中,等待两个定时器,workTicker和refreshTicker, 定时时间分别由由配置中的QueueScanInterval和QueueScanRefreshInterval决定。 这种由等待定时器触发的循环避免了函数持续的执行影响性能,而Golang的特性使得这种机制在写法上非常简洁。
- workTicker定时器触发扫描流程。 nsqd采用了Redis的probabilistic expiration算法来进行扫描。首先从所有Channel中随机选取部分Channel,然后遍历被选取的Channel,投到workerChan中,并且等待反馈结果, 结果有两种,dirty和非dirty,如果dirty的比例超过配置中设定的QueueScanDirtyPercent,那么不进入休眠, 继续扫描,如果比例较低,则重新等待定时器触发下一轮扫描。这种机制可以在保证处理延时较低的情况下减少对CPU资源的浪费。
- refreshTicker定时器触发更新Channel列表流程。 这个流程比较简单,先获取一次Channel列表, 再调用resizePool重新分配worker。
接下来再看看resizePool的实现。
// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// contract
closeCh <- 1
n.poolSize--
} else {
// expand
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
当需要的worker数量超过之前分配的数量时,通过向closeCh投递消息使多余的worker销毁, 如果需要的数量比之前的多,则通过queueScanWorker创建新的worker。
// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}
queueScanWorker接收workCh发来的消息,处理,并且通过responseCh反馈消息。收到closeCh时则关闭。 由于所有worker都监听相同的closeCh,所以当向closeCh发送消息时,随机关闭一个worker。 且由于workCh和closeCh的监听是串行的,所以不存在任务处理到一半时被关闭的可能。 这也是nsq中优雅关闭gochannel的的一个例子。
func (c *Channel) processInFlightQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
for {
c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
dirty = true
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
c.doRequeue(msg)
}
exit:
return dirty
}
processInFlightQueue取出inFlightPQ顶部的消息,如果当前消息已经超时,则将消息从队列中移除,并返回消息。 由于队列是优先级队列,所以如果processInFlightQueue取出的消息为空,则不需要再往后取了,直接返回false表示当前非dirty状态。 如果取到了消息,则说明该消息投递超时,需要把消息传入doRequeue立即重新投递。
其它优化
topic 遍历 channel时 做的缓存
topic 类中 messagePump 需要遍历 所有的channel, 也就是遍历 t.channelMap; 多线程中遍历类中一个成员变量的操作, 需要给 类加锁, 普通的写法像这样:
t.RLock()
for i, channel := range t.channelMaps {
}
t.RUnlock()
但是问题这里是在一个 for 循环中, 整个操作就相当于这样:
for {
t.RLock()
for i, channel := range t.channelMaps {
}
t.RUnlock()
}
这个锁范围太大了, 锁竞争很大, 所以nsq 做了一个缓存, 并且接收 t.channelUpdateChan 通知来 更新 缓存, 就像这样:
func (t *Topic) messagePump() {
var chans []*Channel
//避免 锁竞争, 所以缓存 已存在的 channel
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
for {
select {
case <-t.channelUpdateChan:
//接到channle 变更的消息, 刷新缓存
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
}
// 遍历 已经缓存的 chans
for i, channel := range chans {
}
}
}
总结
Topic/Channel是发布/订阅模型的一种实现。Topic对应于发布,Channel对应于订阅。 消费者通过在Topic下生成不同的Channel来接收来自该Topic的消息。 通过生成相同的Channel来实现消费者负载均衡。
Channel本身在投递消息给消费者时维护两个队列,一个是inFlight队列,该队列存储正在投递,但还没被标记为投递成功的消息。 另一个是deferred队列,用来存储需要被延时投递的消息。
inFlight队列中消息可能因为投递超时而失败,deferred队列中的消息需要在到达指定时间后进行重新投递。 如果为两个队列中的每个消息都分别指定定时器,无疑是非常消耗资源的。因此nsq采用定时扫描队列的做法。 在扫描时采用多个worker分别处理。这种类似多线程的处理方式提高了处理效率。 nsq在扫描策略上使用了Redis的probabilistic expiration算法,同时动态调整worker的数量, 这些优化平衡了效率和资源占用。