nsq code analysis


nsq code analysis

启动和退出

启动, 利用svc框架

nsq使用了svc框架来启动一个service, Run 时, 分别调用prg 实现的 Init 和 Start 方法 启动’program’,然后监听 后两个参数的信号量, 当信号量到达, 调用 prg 实现的 Stop 方法来退出

func main() {
	prg := &program{}
	// svc 框架的Run 方法启动一个service
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		log.Fatal(err)
	}
}
// svc 框架, 启动 service, 也可以理解为 daemon
//
// 方法会一直阻塞, 直到 指定的信号量到来
// 如果指定信号为空, 默认是 syscall.SIGINT and syscall.SIGTERM
func Run(service Service, sig ...os.Signal) error {
	env := environment{}

	//先调用Init 初始化
	if err := service.Init(env); err != nil {
		return err
	}

	//再调用Start
	if err := service.Start(); err != nil {
		return err
	}

	//准备信号量处理
	if len(sig) == 0 {
		sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
	}

	signalChan := make(chan os.Signal, 1)
	signalNotify(signalChan, sig...)

	//整个程序会阻塞在这里, 等待系统信号量
	<-signalChan

	// 当信号来到, 调用stop 方法停掉
	return service.Stop()
}

svc 生命周期三个方法的实现:

func (p *program) Init(env svc.Environment) error {
	if env.IsWindowsService() {
		dir := filepath.Dir(os.Args[0])
		return os.Chdir(dir)
	}
	return nil
}

func (p *program) Start() error {
	opts := nsqd.NewOptions()

	flagSet := nsqdFlagSet(opts)
	flagSet.Parse(os.Args[1:])

	rand.Seed(time.Now().UTC().UnixNano())

	if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) {
		fmt.Println(version.String("nsqd"))
		os.Exit(0)
	}

	var cfg config
	configFile := flagSet.Lookup("config").Value.String()
	if configFile != "" {
		_, err := toml.DecodeFile(configFile, &cfg)
		if err != nil {
			log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
		}
	}
	cfg.Validate()

	options.Resolve(opts, flagSet, cfg)
	nsqd := nsqd.New(opts)

	err := nsqd.LoadMetadata()
	if err != nil {
		log.Fatalf("ERROR: %s", err.Error())
	}
	err = nsqd.PersistMetadata()
	if err != nil {
		log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
	}
	nsqd.Main()

	p.nsqd = nsqd
	return nil
}

func (p *program) Stop() error {
	if p.nsqd != nil {
		p.nsqd.Exit()
	}
	return nil
}

优雅退出

NSQd 的真正的启动函数是Main

func (n *NSQD) Main() {
	var httpListener net.Listener
	var httpsListener net.Listener

	ctx := &context{n}

	tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
	if err != nil {
		n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
		os.Exit(1)
	}
	n.Lock()
	n.tcpListener = tcpListener
	n.Unlock()
	tcpServer := &tcpServer{ctx: ctx}
	n.waitGroup.Wrap(func() {
		protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
	})

	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
		httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
		if err != nil {
			n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
			os.Exit(1)
		}
		n.Lock()
		n.httpsListener = httpsListener
		n.Unlock()
		httpsServer := newHTTPServer(ctx, true, true)
		n.waitGroup.Wrap(func() {
			http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
		})
	}
	httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
	if err != nil {
		n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
		os.Exit(1)
	}
	n.Lock()
	n.httpListener = httpListener
	n.Unlock()
	httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
	n.waitGroup.Wrap(func() {
		http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
	})

	n.waitGroup.Wrap(func() { n.queueScanLoop() })
	n.waitGroup.Wrap(func() { n.lookupLoop() })
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(func() { n.statsdLoop() })
	}
}

现在我们就有了多个线程,比如:

假如Main线程在接收到系统信号量之后直接退出, 那么其他两个线程也会跟着销毁. 于是这里就会出现一个问题, tcp服务线程 和 http服务线程, 是业务逻辑实现的主要线程, 里面可能正在”干活”, 此时需要退出, 是否有数据需要持久化了? 事务是否正好执行到一半?

所以退出时需要做优雅退出处理. Main所在线程退出之前, 需要 ‘通知’ 并且 ‘等待’ 两个服务线程退出后才能退出

为了实现这一点, nsq 利用了 sync包中的 waitgroup 组件

nsq 为了方便使用, 做了一个封装:

type WaitGroupWrapper struct {
	sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
	w.Add(1) // +1
	// 起一个线程, 就是go 的 goroutine
	go func() {
		cb() //执行任务
		w.Done() // -1
	}()
}

那么比如nsq 启动 tcp 服务线程 的代码, 就相当于:

w.Add(1)
go func() {
	protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
	w.Done()
}()
w.Wait()  //Main 线程就会阻塞在这里; 等待TCPServer 结束时调用w.Done()后才退出

通知 goroutine 退出

好了, 现在Main 会等待TCPServer 退出之后再退出了, 那, Main要退出的时候, 如何通知 TCPServer 退出?

protocol.TCPServer 就是循环的监听 tcp listener的 accept 连接, 交给 handler 处理; 然后这个for 循环是一个死循环, 依靠 accept 返回 error 才退出

// ==================== 注意, 这里的 TCPServer 运行在一个新的 goroutine 中
// 这个TCPServer 也是一个可以复用的比较不错的 tcp 程序 accept 模式
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
	l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

	for {
		clientConn, err := listener.Accept()
		if err != nil {
			if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
				l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
				runtime.Gosched()       // 是临时的错误, 暂停一下继续
				continue
			}
			// theres no direct way to detect this error because it is not exposed
			if !strings.Contains(err.Error(), "use of closed network connection") {
				l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
			}
			break
		}
		go handler.Handle(clientConn)
	}

	l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

再看Exit函数:

func (n *NSQD) Exit() {
	if n.tcpListener != nil {
		n.tcpListener.Close()
	}

	if n.httpListener != nil {
		n.httpListener.Close()
	}

	if n.httpsListener != nil {
		n.httpsListener.Close()
	}

	n.Lock()
	err := n.PersistMetadata()
	if err != nil {
		n.logf("ERROR: failed to persist metadata - %s", err)
	}
	n.logf("NSQ: closing topics")
	for _, topic := range n.topicMap {
		topic.Close()
	}
	n.Unlock()

	close(n.exitChan)
	n.waitGroup.Wait()

	n.dl.Unlock()
}

这里信号量到达之后, 做了一个 tcpListener.Close(), 其实这个动作的目的是通知 tcpListener 服务所在的线程退出

这里直接利用了 listener 被 close 之后再 accept 调用会返回error的特性, 既退出了线程, 又关闭了 listener, 一举两得

消息处理流

协议不同版本 的 封装处理

接到连接之后, 交给TCPHandler 处理, 这里是由tcpServer 实现:


func (p *tcpServer) Handle(clientConn net.Conn) {
	p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

	// 先读取4个字节 作为 协议版本号
	buf := make([]byte, 4)
	_, err := io.ReadFull(clientConn, buf)
	if err != nil {
		p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
		return
	}
	protocolMagic := string(buf)

	p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
		clientConn.RemoteAddr(), protocolMagic)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		// protocolV2 实现了 V2 这个版本的协议
		prot = &protocolV2{ctx: p.ctx}
	//假如有另外一个版本的协议
	//case "  V3":
	//	prot = &protocolV3{ctx: p.ctx}
	default:
		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
		clientConn.Close()
		p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
			clientConn.RemoteAddr(), protocolMagic)
		return
	}

	//交给具体protocol实现类处理每个连接
	err = prot.IOLoop(clientConn)
	if err != nil {
		p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
		return
	}
}

Handle 函数先读取4个字节的字符串作为版本号, 根据版本号选用具体的 protocol 实现, 这里的 “ V2”这个版本的协议由protocolV2 实现

将协议封装, 方便以后不同协议的扩展

type Protocol interface {
	IOLoop(conn net.Conn) error
}

具体 每个’命令’处理的封装

前面看到, listener.Accept() 之后, 解析了版本号, 当前由 protocol_v2 具体实现了 IOLoop

func (p *protocolV2) IOLoop(conn net.Conn) error {
	var err error
	var line []byte
	var zeroTime time.Time

	// nsqd 范围的 clientID 序列
	clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
	client := newClientV2(clientID, conn, p.ctx)

	// 这个messagePump 名字很形象, 把要发送给client 的messgae 从 缓存池子里Pump 抽出来 做具体的发送
	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan  //TODO messagePump 里要做初始化, 具体是什么? 为什么需要这样阻塞一下IOLoop ?

	// 底下这个地方是 处理 client 的请求的
	for {
		// 如果 client 设置需要做心跳检查, 则设置 读超时为 两倍 心跳检查间隔
		// 也就是说, 在这个时间里, 正常情况下, client肯定会在这个间隔内发一个心跳包过来
		if client.HeartbeatInterval > 0 {
			client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
		} else {
			// 加入client 不设置做心跳则不设置读超时
			client.SetReadDeadline(zeroTime)
		}

		// ReadSlice does not allocate new space for the data each request
		// ie. the returned slice is only valid until the next call to it
		line, err = client.Reader.ReadSlice('\n')
		if err != nil {
			if err == io.EOF {
				err = nil
			} else {
				err = fmt.Errorf("failed to read command - %s", err)
			}
			break
		}

		// 这个V2 版本的协议, 一行是一个命令
		line = line[:len(line)-1]
		// optionally trim the '\r' , 处理一下\r, 有可能win版本的回车是 \r\n
		if len(line) > 0 && line[len(line)-1] == '\r' {
			line = line[:len(line)-1]
		}
		// 每个命令, 用 " "空格来划分 参数
		params := bytes.Split(line, separatorBytes)

		if p.ctx.nsqd.getOpts().Verbose {
			p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params)
		}

		// 执行命令
		var response []byte
		response, err = p.Exec(client, params)
		if err != nil {
			ctx := ""
			if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
				ctx = " - " + parentErr.Error()
			}
			p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx)

			sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
			if sendErr != nil {
				p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx)
				break
			}

			// errors of type FatalClientErr should forceably close the connection
			if _, ok := err.(*protocol.FatalClientErr); ok {
				break
			}
			continue
		}

		// 加入命令是有 '响应' 的, 发送响应
		if response != nil {
			err = p.Send(client, frameTypeResponse, response)
			if err != nil {
				err = fmt.Errorf("failed to send response - %s", err)
				break
			}
		}
	}

	p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client)
	conn.Close()

	// 通知 messagePump 退出
	close(client.ExitChan)
	if client.Channel != nil {
		client.Channel.RemoveClient(client.ID)
	}

	return err
}

func (p *protocolV2) Send(client *clientV2, frameType int32, data []byte) error {
	// 因为client 的处理 还有一个 messagePump 线程, 所以 发送要锁
	client.writeLock.Lock()

	var zeroTime time.Time
	if client.HeartbeatInterval > 0 {
		client.SetWriteDeadline(time.Now().Add(client.HeartbeatInterval))
	} else {
		client.SetWriteDeadline(zeroTime)
	}

	//V2 协议版本 发送给client, 是 使用 [(4byte)消息长度 , (4byte)消息类型, (载体)] 的 帧格式
	_, err := protocol.SendFramedResponse(client.Writer, frameType, data)
	if err != nil {
		client.writeLock.Unlock()
		return err
	}

	if frameType != frameTypeMessage {
		err = client.Flush()
	}

	client.writeLock.Unlock()

	return err
}

func SendFramedResponse(w io.Writer, frameType int32, data []byte) (int, error) {
	beBuf := make([]byte, 4)
	size := uint32(len(data)) + 4

	binary.BigEndian.PutUint32(beBuf, size)
	n, err := w.Write(beBuf)
	if err != nil {
		return n, err
	}

	binary.BigEndian.PutUint32(beBuf, uint32(frameType))
	n, err = w.Write(beBuf)
	if err != nil {
		return n + 4, err
	}

	n, err = w.Write(data)
	return n + 8, err
}

读取并解析请求之后, 传给 p.Exec(client, params) 执行具体的请求

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	//这个命令不需要做认证
	if bytes.Equal(params[0], []byte("IDENTIFY")) {
		return p.IDENTIFY(client, params)
	}
	// client 是否做了认证
	err := enforceTLSPolicy(client, p, params[0])
	if err != nil {
		return nil, err
	}
	//一行一个请求, 一个命令中用 " " 空格分割参数, 第一个参数是 命令标志
	switch {
	case bytes.Equal(params[0], []byte("FIN")):
		return p.FIN(client, params)
	case bytes.Equal(params[0], []byte("RDY")):
		return p.RDY(client, params)
	case bytes.Equal(params[0], []byte("REQ")):
		return p.REQ(client, params)
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
	case bytes.Equal(params[0], []byte("MPUB")):
		return p.MPUB(client, params)
	case bytes.Equal(params[0], []byte("DPUB")):
		return p.DPUB(client, params)
	case bytes.Equal(params[0], []byte("NOP")):
		return p.NOP(client, params)
	case bytes.Equal(params[0], []byte("TOUCH")):
		return p.TOUCH(client, params)
	case bytes.Equal(params[0], []byte("SUB")):
		return p.SUB(client, params)
	case bytes.Equal(params[0], []byte("CLS")):
		return p.CLS(client, params)
	case bytes.Equal(params[0], []byte("AUTH")):
		return p.AUTH(client, params)
	}
	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}

发送接收流程

从TcpServer看到nsq是如何接收client连接, 读取请求消息, 执行并返回结果响应的流程. 看到具体的执行命令代码, 其中有一个是”PUB”命令, 也就是消息推送, 现在就来看一下, 一个消息从被一个client做PUB发送, 到SUB订阅了消息的Client接收, 在nsqd里是如何流转的

1. 客户端 层面 的 PUB 消息

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
	switch {
	case bytes.Equal(params[0], []byte("PUB")):
		return p.PUB(client, params)
}

看 PUB方法的具体实现:

//客户端推送消息
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
	var err error

	if len(params) < 2 {
		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
	}

	//请求格式: [ PUB topicName ...]
	topicName := string(params[1])
	if !protocol.IsValidTopicName(topicName) {
		return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
			fmt.Sprintf("PUB topic name %q is not valid", topicName))
	}

	//消息体, 在client 请求的下一行开始, 头4个字节是消息体长度
	bodyLen, err := readLen(client.Reader, client.lenSlice)
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
	}

	if bodyLen <= 0 {
		return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
			fmt.Sprintf("PUB invalid message body size %d", bodyLen))
	}

	//长度是否过大
	if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize {
		return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE",
			fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize))
	}

	//建立缓冲区并读入
	messageBody := make([]byte, bodyLen)
	_, err = io.ReadFull(client.Reader, messageBody)
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
	}

	//client 是否有权限对这个 topic 做 PUB 操作
	if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil {
		return nil, err
	}

	topic := p.ctx.nsqd.GetTopic(topicName)

	//创建 Message 对象, TODO: nsqd 的 唯一id 发生器
	msg := NewMessage(topic.GenerateID(), messageBody)

	//topic 发送消息
	err = topic.PutMessage(msg)
	if err != nil {
		return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
	}

	return okBytes, nil
}

2. Topic 层的 PutMessage

PUB 方法做一系列检查, 然后调用 topic.PutMessage(msg) 做具体的发送

func (t *Topic) PutMessage(m *Message) error {
	//TODO 这里是个 ReadLock, 应该是为了锁住 exitFlag ?
	t.RLock()
	defer t.RUnlock()
	//这里使用了一个 atomic Int32 类型的 exitFlag 退出标志, 如果已经退出了就不在 put 了
	if atomic.LoadInt32(&t.exitFlag) == 1 {
		return errors.New("exiting")
	}
	//发送
	err := t.put(m)
	if err != nil {
		return err
	}
	//做个计数
	atomic.AddUint64(&t.messageCount, 1)
	return nil
}
func (t *Topic) put(m *Message) error {
	// 这里巧妙利用了 chan 的特性
	// 先写入 memoryMsgChan 这个队列,假如 memoryMsgChan 已满, 不可写入
	// golang 就会执行 default 语句,
	select {
	case t.memoryMsgChan <- m: //'变量' 不都是内存? 这个 '内存', 是相对于 下面这个 DiskQueue 来讲的)
	default:

		// 利用 sync.Pool 包 做了一个 bytes.Buffer 的缓存池, 从池中取出一个来用
		// <<go语言的官方包sync.Pool的实现原理和适用场景>> :
		// http://blog.csdn.net/yongjian_lian/article/details/42058893
		b := bufferPoolGet()

		// 这个 t.backend, 在NewTopic 函数中初始化, 它具体是一个 DiskQueue
		//TODO 先不去管他的具体实现, 暂时认识它是把消息 写入硬盘 就好
		//t.backend = newDiskQueue(topicName,
		//	ctx.nsqd.getOpts().DataPath,
		//	ctx.nsqd.getOpts().MaxBytesPerFile,
		//	int32(minValidMsgLength),
		//	int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
		//	ctx.nsqd.getOpts().SyncEvery,
		//	ctx.nsqd.getOpts().SyncTimeout,
		//	ctx.nsqd.getOpts().Logger)
		err := writeMessageToBackend(b, m, t.backend)

		// 放回缓存池
		bufferPoolPut(b)
		t.ctx.nsqd.SetHealth(err)
		if err != nil {
			t.ctx.nsqd.logf(
				"TOPIC(%s) ERROR: failed to write message to backend - %s",
				t.name, err)
			return err
		}
	}
	return nil
}

Message 的数据流看代码, 到这个地方断掉了; 不过既然使用了 chan , 很明显 chan的 推和拉不在一个线程中

用IDE 对 t.memoryMsgChan 做 ‘Find Usages’ 操作, 很快定位到了 ‘<-memoryMsgChan’ 操作在 Topic 类的 messagePump 函数中

// 每个 topic 自己有一个 messagePump 消息投递 loop
func (t *Topic) messagePump() {

	//避免 锁竞争, 所以缓存 已存在的 channel
	t.RLock()
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.RUnlock()

	for {
		select {
		case msg = <-memoryMsgChan:
		case <-t.channelUpdateChan:
			//之前 避免 锁竞争, 缓存了 已存在的 channel
			//假如更新了 channel 会发一个 消息过来, 重新读取 channel
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			continue
		}

		// 这里需要 遍历 t.channelMap, 普通的写法, 会导致锁竞争太大
		for i, channel := range chans {
			chanMsg := msg
			// copy the message because each channel
			// needs a unique instance but...
			// fastpath to avoid copy if its the first channel
			// (the topic already created the first copy)
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 {
				channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
				continue
			}
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.ctx.nsqd.logf(
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}
}

这个 messagePump 函数是 在 NewTopic() 也就是创建 Topic 的时候启动的一个线程

也就是说, topic 是自己 负责 把自己的 memoryMsgChan 做 Pump 出来 , 再塞到 二级 channel 的 memoryMsgChan 里

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
	t := &Topic{
		...
	}
	...
	t.waitGroup.Wrap(func() { t.messagePump() })
}

3. Channel 层的 PutMessage

topic 的 put 调用 topic下含有的所有 channel 的 PutMessage;

channel 跟topic 其实是一样的, 可以这么理解: channel 就是二级topic

func (c *Channel) PutMessage(m *Message) error {
	c.RLock()
	defer c.RUnlock()
	if atomic.LoadInt32(&c.exitFlag) == 1 {
		return errors.New("exiting")
	}
	err := c.put(m)
	if err != nil {
		return err
	}
	atomic.AddUint64(&c.messageCount, 1)
	return nil
}

func (c *Channel) put(m *Message) error {
	select {
	case c.memoryMsgChan <- m:
	default:
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, c.backend)
		bufferPoolPut(b)
		c.ctx.nsqd.SetHealth(err)
		if err != nil {
			c.ctx.nsqd.logf("CHANNEL(%s) ERROR: failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}

4. Client 层的 messagePump

但是会发现, Channel 自己并没有 messagePump 函数, 再次通过Usage 查找, <-channel.memoryMsgChan 读取操作在 protocolV2.messagePump(), 这也就是上文中, 每个Client连接的处理函数IOLoop, 每个Client 都会有一个 protocolV2.messagePump() 线程

func (p *protocolV2) IOLoop(conn net.Conn) error {

	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan

	for {
		line, err = client.Reader.ReadSlice('\n')   // 读取
		params := bytes.Split(line, separatorBytes) // 解析
		response, err = p.Exec(client, params)		// 执行

		err = p.Send(client, frameTypeResponse, response) //发送结果
	}
}

客户端的其他操作我们先忽略不管, 主要看 Message 的数据路径, 精简掉 messagePump 的代码:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
...
	for {
		if subChannel == nil || !client.IsReadyForMessages() {
		} else {
			// we're buffered (if there isn't any more data we should flush)...
			// select on the flusher ticker channel, too
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = outputBufferTicker.C
		}

		// 这里负责执行Client 的各种事件
		select {

		//Client 需要发送一个SUB 请求 来订阅Channel, 并切一个Client只能订阅一个Channel
		case subChannel = <-subEventChan:  // 做了订阅
			subEventChan = nil
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()

			// protocol 进行消息格式的打包, 再发送给Client
			// 这里, Message 就发送给了 client
			err = p.SendMessage(client, msg, &buf)
			if err != nil {
				goto exit
			}
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}
}
...

5. 整体 Message 数据流 精简总结:

总体来讲, 一个Message的推送, 经过了 两个chan 队列:

两个队列的两头(读, 写), 分别有一个线程进行, 也就是3个线程:

Message 的数据流就是:

ClientConn->topic.memoryMsgChan: protocol.IOLoop(clientConn) topic.memoryMsgChan->channel.memoryMsgChan: topic.messagePump() channel.memoryMsgChan-->ClientConn: protocol.messagePump()

从client连接进来, 到message 投递给Client 的所有流程代码, 精简如下:

clientConn, err := listener.Accept()
// Client  ==>  Client 请求响应线程  ==> topic.memoryMsgChan
go handler.Handle(clientConn) {
	err = prot.IOLoop(clientConn){
		for {
			line, err = client.Reader.ReadSlice('\n') // <==Client
			params := bytes.Split(line, separatorBytes)
			response, err = p.Exec(client, params){
				switch {
				case bytes.Equal(params[0], []byte("PUB")):
					return p.PUB(client, params){
						topic.PutMessage(msg)	// ==>topic.memoryMsgChan
					}
			}
		}
	}
}

// topic.memoryMsgChan  ==>  Topic.messagePump 线程  ==> echa channel.memoryMsgChan
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
	t.waitGroup.Wrap(func() {t.messagePump(){
		for {
			select {
			case msg = <-memoryMsgChan:    // <==topic.memoryMsgChan
			for i, channel := range chans {
				err := channel.PutMessage(chanMsg) // ==>channel.memoryMsgChan
			}
		}}
	})
}

// client.subChannel.memoryMsgChan ==>  Client的messagePump 线程  ==> Client
func (p *protocolV2) IOLoop(conn net.Conn) error {
	go p.messagePump(client, messagePumpStartedChan){
		for {
			select {
			case subChannel = <-subEventChan:
			case msg := <-subChannel.memoryMsgChan: // <==client.subChannel.memoryMsgChan
				err = p.SendMessage(client, msg, &buf) // ==>Client
			}
		}
	}
}

虽然 一个channel 会被多个Client 订阅, 从代码上来讲, 也就是 一个 channel.memoryMsgChan 会被多个 client 线程的 messagePump() 方法中进行读取

但因为chan 的特性, 一条缓存在 channel.memoryMsgChan 中的消息, 只会被一个 client 读取到

消息投递可靠性

如何保证至少成功投递一次? 在protocolV2的messagePump函数中,消息被通过投送到相应消费者。 投递时首先调用Channel的StartInFlightTimeout函数

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	for {
		select {
		case msg := <-memoryMsgChan:
			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg, &buf)
		}
	}
}

func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
	now := time.Now()
	msg.clientID = clientID
	msg.deliveryTS = now
	msg.pri = now.Add(timeout).UnixNano()
	err := c.pushInFlightMessage(msg)
	if err != nil {
		return err
	}
	c.addToInFlightPQ(msg)
	return nil
}

该函数填充消息的消费者ID、投送时间、优先级,然后调用pushInFlightMessage函数将消息放入inFlightMessages字典中。 最后调用addToInFlightPQ将消息放入inFlightPQ队列中。

至此,消息投递流程完成,接下来需要等待消费者对投送结果的反馈。消费者通过发送FIN、REQ、TOUCH来回复对消息的处理结果。

投递成功

消费者发送FIN,表明消息已经被接收并正确处理。

func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
	msg, err := c.popInFlightMessage(clientID, id)
	if err != nil {
		return err
	}
	c.removeFromInFlightPQ(msg)
	if c.e2eProcessingLatencyStream != nil {
		c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
	}
	return nil
}

FinishMessage分别调用popInFlightMessage和removeFromInFlightPQ将消息从inFlightMessages和inFlightPQ中删除。 最后,统计该消息的投递情况。

投递失败

客户端发送REQ,表明消息投递失败,需要再次被投递。

func (c *Channel) RequeueMessage(clientID int64, id MessageID, timeout time.Duration) error {
	// remove from inflight first
	msg, err := c.popInFlightMessage(clientID, id)
	if err != nil {
		return err
	}
	c.removeFromInFlightPQ(msg)
	atomic.AddUint64(&c.requeueCount, 1)

	if timeout == 0 {
		c.exitMutex.RLock()
		if c.Exiting() {
			c.exitMutex.RUnlock()
			return errors.New("exiting")
		}
		err := c.put(msg)
		c.exitMutex.RUnlock()
		return err
	}

	// deferred requeue
	return c.StartDeferredTimeout(msg, timeout)
}

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
	absTs := time.Now().Add(timeout).UnixNano()
	item := &pqueue.Item{Value: msg, Priority: absTs}
	err := c.pushDeferredMessage(item)
	if err != nil {
		return err
	}
	c.addToDeferredPQ(item)
	return nil
}

Channel在RequeueMessage函数对消息投递失败进行处理。该函数将消息从inFlightMessages和inFlightPQ中删除, 随后进行重新投递。

发送REQ时有一个附加参数timeout,该值为0时表示立即重新投递,大于0时表示等待timeout时间之后投递。如果timeout大于0,则调用StartDeferredTimeout进行延迟投递。 首先计算延迟投递的时间点,然后调用pushDeferredMessage将消息加入deferredMessage字典, 最后将消息放入deferredPQ队列。延迟投递的消息会被专门的worker扫描并在延迟投递的时间点后进行投递。 需要注意的是,立即重新投递的消息不会进入deferredPQ队列

消息的超时和延迟投递处理

n.waitGroup.Wrap(func() { n.queueScanLoop() })

func (n *NSQD) queueScanLoop() {
	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
	closeCh := make(chan int)

	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

	channels := n.channels()
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C:
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i]
		}

		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}

		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}

exit:
	n.logf("QUEUESCAN: closing")
	close(closeCh)
	workTicker.Stop()
	refreshTicker.Stop()
}

该函数使用若干个worker来扫描并处理当前在投递中以及等待重新投递的消息。worker的个数由配置和当前Channel数量共同决定。 首先,初始化3个gochannel:workCh、responseCh、closeCh,分别控制worker的输入、输出和销毁。

然后获取当前的Channel集合,并且调用resizePool函数来启动指定数量的worker。

最后进入扫描的循环。在循环中,等待两个定时器,workTicker和refreshTicker, 定时时间分别由由配置中的QueueScanInterval和QueueScanRefreshInterval决定。 这种由等待定时器触发的循环避免了函数持续的执行影响性能,而Golang的特性使得这种机制在写法上非常简洁。

  1. workTicker定时器触发扫描流程。 nsqd采用了Redis的probabilistic expiration算法来进行扫描。首先从所有Channel中随机选取部分Channel,然后遍历被选取的Channel,投到workerChan中,并且等待反馈结果, 结果有两种,dirty和非dirty,如果dirty的比例超过配置中设定的QueueScanDirtyPercent,那么不进入休眠, 继续扫描,如果比例较低,则重新等待定时器触发下一轮扫描。这种机制可以在保证处理延时较低的情况下减少对CPU资源的浪费。
  2. refreshTicker定时器触发更新Channel列表流程。 这个流程比较简单,先获取一次Channel列表, 再调用resizePool重新分配worker。

接下来再看看resizePool的实现。

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 	1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	idealPoolSize := int(float64(num) * 0.25)
	if idealPoolSize < 1 {
		idealPoolSize = 1
	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
	}
	for {
		if idealPoolSize == n.poolSize {
			break
		} else if idealPoolSize < n.poolSize {
			// contract
			closeCh <- 1
			n.poolSize--
		} else {
			// expand
			n.waitGroup.Wrap(func() {
				n.queueScanWorker(workCh, responseCh, closeCh)
			})
			n.poolSize++
		}
	}
}

当需要的worker数量超过之前分配的数量时,通过向closeCh投递消息使多余的worker销毁, 如果需要的数量比之前的多,则通过queueScanWorker创建新的worker。

// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
	for {
		select {
		case c := <-workCh:
			now := time.Now().UnixNano()
			dirty := false
			if c.processInFlightQueue(now) {
				dirty = true
			}
			if c.processDeferredQueue(now) {
				dirty = true
			}
			responseCh <- dirty
		case <-closeCh:
			return
		}
	}
}

queueScanWorker接收workCh发来的消息,处理,并且通过responseCh反馈消息。收到closeCh时则关闭。 由于所有worker都监听相同的closeCh,所以当向closeCh发送消息时,随机关闭一个worker。 且由于workCh和closeCh的监听是串行的,所以不存在任务处理到一半时被关闭的可能。 这也是nsq中优雅关闭gochannel的的一个例子。

func (c *Channel) processInFlightQueue(t int64) bool {
    c.exitMutex.RLock()
    defer c.exitMutex.RUnlock()

    if c.Exiting() {
        return false
    }

    dirty := false
    for {
        c.inFlightMutex.Lock()
        msg, _ := c.inFlightPQ.PeekAndShift(t)
        c.inFlightMutex.Unlock()

        if msg == nil {
            goto exit
        }
        dirty = true

        _, err := c.popInFlightMessage(msg.clientID, msg.ID)
        if err != nil {
            goto exit
        }
        atomic.AddUint64(&c.timeoutCount, 1)
        c.RLock()
        client, ok := c.clients[msg.clientID]
        c.RUnlock()
        if ok {
            client.TimedOutMessage()
        }
        c.doRequeue(msg)
    }

exit:
    return dirty
}

processInFlightQueue取出inFlightPQ顶部的消息,如果当前消息已经超时,则将消息从队列中移除,并返回消息。 由于队列是优先级队列,所以如果processInFlightQueue取出的消息为空,则不需要再往后取了,直接返回false表示当前非dirty状态。 如果取到了消息,则说明该消息投递超时,需要把消息传入doRequeue立即重新投递。

其它优化

topic 遍历 channel时 做的缓存

topic 类中 messagePump 需要遍历 所有的channel, 也就是遍历 t.channelMap; 多线程中遍历类中一个成员变量的操作, 需要给 类加锁, 普通的写法像这样:

t.RLock()
for i, channel := range t.channelMaps {

}
t.RUnlock()

但是问题这里是在一个 for 循环中, 整个操作就相当于这样:

for {
	t.RLock()
	for i, channel := range t.channelMaps {

	}
	t.RUnlock()
}

这个锁范围太大了, 锁竞争很大, 所以nsq 做了一个缓存, 并且接收 t.channelUpdateChan 通知来 更新 缓存, 就像这样:

func (t *Topic) messagePump() {
	var chans []*Channel

	//避免 锁竞争, 所以缓存 已存在的 channel
	t.RLock()
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.RUnlock()

	for {
		select {
		case <-t.channelUpdateChan:
			//接到channle 变更的消息, 刷新缓存
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		}

		// 遍历 已经缓存的 chans
		for i, channel := range chans {
		}
	}
}

总结

Topic/Channel是发布/订阅模型的一种实现。Topic对应于发布,Channel对应于订阅。 消费者通过在Topic下生成不同的Channel来接收来自该Topic的消息。 通过生成相同的Channel来实现消费者负载均衡。

Channel本身在投递消息给消费者时维护两个队列,一个是inFlight队列,该队列存储正在投递,但还没被标记为投递成功的消息。 另一个是deferred队列,用来存储需要被延时投递的消息。

inFlight队列中消息可能因为投递超时而失败,deferred队列中的消息需要在到达指定时间后进行重新投递。 如果为两个队列中的每个消息都分别指定定时器,无疑是非常消耗资源的。因此nsq采用定时扫描队列的做法。 在扫描时采用多个worker分别处理。这种类似多线程的处理方式提高了处理效率。 nsq在扫描策略上使用了Redis的probabilistic expiration算法,同时动态调整worker的数量, 这些优化平衡了效率和资源占用。