1.基本使用
var producer *nsq.Producer // 主函数 func main() { strIP1 := "127.0.0.1:4150" InitProducer(strIP1) running := true //读取控制台输入 reader := bufio.NewReader(os.Stdin) for running { data, _, _ := reader.ReadLine() command := string(data) if command == "stop" { running = false } err := Publish("test", command) fmt.Println(err) } } // 初始化生产者 func InitProducer(str string) { var err error fmt.Println("address: ", str) //生产者实例 producer, err = nsq.NewProducer(str, nsq.NewConfig()) if err != nil { panic(err) } } //发布消息 func Publish(topic string, message string) error { var err error if producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } err = producer.Publish(topic, []byte(message)) // 发布消息 return err } return fmt.Errorf("producer is nil", err) }2. 初始化config-nsq.NewConfig()
func NewConfig() *Config { //nsq 配置 c := &Config{ //structTagsConfig 和 tlsConfig 用来处理Config中的字段 configHandlers: []configHandler{&structTagsConfig{}, &tlsConfig{}}, initialized: true, } //分别调用structTagsConfig和tlsConfig 对config进行字段的验证,字段的默认值设置 if err := c.setDefaults(); err != nil { panic(err.Error()) } return c }3.生产者初始化-nsq.NewProducer
func NewProducer(addr string, config *Config) (*Producer, error) { //判断config是否初始化过 config.assertInitialized() //对config内的字段进行验证 //分别调用 structTagsConfig 和 tlsConfig 的Validate方法进行验证 err := config.Validate() if err != nil { return nil, err } //初始化结构体 p := &Producer{ id: atomic.AddInt64(&instCount, 1), addr: addr, config: *config, logger: log.New(os.Stderr, "", log.Flags()), logLvl: LogLevelInfo, transactionChan: make(chan *ProducerTransaction), exitChan: make(chan int), responseChan: make(chan []byte), errorChan: make(chan []byte), } return p, nil }4. 消息发布- producer.Publish(topic, []byte(message))
func (w *Producer) Publish(topic string, body []byte) error { //Publish 返回Command结构体 return w.sendCommand(Publish(topic, body)) }5. 发送命令-sendCommand
func (w *Producer) sendCommand(cmd *Command) error { doneChan := make(chan *ProducerTransaction) //发送命令 err := w.sendCommandAsync(cmd, doneChan, nil) if err != nil { close(doneChan) return err } //阻塞管道 执行发送结束 t := <-doneChan return t.Error }6. 同步发送命令-sendCommandAsync
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction, args []interface{}) error { //生产者数量加1 atomic.AddInt32(&w.concurrentProducers, 1) defer atomic.AddInt32(&w.concurrentProducers, -1) //判断状态 如果没有连接 则进行连接 if atomic.LoadInt32(&w.state) != StateConnected { err := w.connect() if err != nil { return err } } t := &ProducerTransaction{ cmd: cmd, doneChan: doneChan, Args: args, } select { //通过管道将命令发送给nsq case w.transactionChan <- t: case <-w.exitChan: return ErrStopped } return nil }7.连接-connect
func (w *Producer) connect() error { //加锁 w.guard.Lock() defer w.guard.Unlock() //判断标志 if atomic.LoadInt32(&w.stopFlag) == 1 { return ErrStopped } //再次判断状态 switch state := atomic.LoadInt32(&w.state); state { case StateInit: case StateConnected: return nil default: return ErrNotConnected } w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr) logger, logLvl := w.getLogger() //生成连接 直接返回了 Conn结构体 //producerConnDelegate 各种钩子的具体实现 w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w}) w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id)) //进行连接 _, err := w.conn.Connect() if err != nil { w.conn.Close() w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err) return err } atomic.StoreInt32(&w.state, StateConnected) w.closeChan = make(chan int) w.wg.Add(1) //处理各个管道接收到的数据 go w.router() return nil }8.进行连接-Connect
func (c *Conn) Connect() (*IdentifyResponse, error) { dialer := &net.Dialer{ LocalAddr: c.config.LocalAddr, Timeout: c.config.DialTimeout, } //tcp连接 conn, err := dialer.Dial("tcp", c.addr) if err != nil { return nil, err } c.conn = conn.(*net.TCPConn) c.r = conn c.w = conn //V2 写入 _, err = c.Write(MagicV2) if err != nil { c.Close() return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err) } //进行鉴定 resp, err := c.identify() if err != nil { return nil, err } if resp != nil && resp.AuthRequired { if c.config.AuthSecret == "" { c.log(LogLevelError, "Auth Required") return nil, errors.New("Auth Required") } err := c.auth(c.config.AuthSecret) if err != nil { c.log(LogLevelError, "Auth Failed %s", err) return nil, err } } c.wg.Add(2) atomic.StoreInt32(&c.readLoopRunning, 1) //负责 读相关工作 如一些心跳检测的响应 接收到不同消息 会触发钩子函数 go c.readLoop() //负责写 将cmdChan管道内的命令写入nsq go c.writeLoop() return resp, nil }9. 命令发送-router
func (w *Producer) router() { for { select { case t := <-w.transactionChan: w.transactions = append(w.transactions, t) //读取管道信息写入nsq err := w.conn.WriteCommand(t.cmd) if err != nil { w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err) w.close() } case data := <-w.responseChan: w.popTransaction(FrameTypeResponse, data) case data := <-w.errorChan: w.popTransaction(FrameTypeError, data) case <-w.closeChan: goto exit case <-w.exitChan: goto exit } } exit: w.transactionCleanup() w.wg.Done() w.log(LogLevelInfo, "exiting router") }10. 真正写入命令-WriteCommand
func (c *Conn) WriteCommand(cmd *Command) error { c.mtx.Lock() //调用Command的WriteTo方法 进行写 _, err := cmd.WriteTo(c) if err != nil { goto exit } //有缓冲 err = c.Flush() exit: c.mtx.Unlock() if err != nil { c.log(LogLevelError, "IO error - %s", err) c.delegate.OnIOError(c, err) } return err }11.写-WriteTo
func (c *Command) WriteTo(w io.Writer) (int64, error) { var total int64 var buf [4]byte //链接写 //写入命令的类型 eg:PUB n, err := w.Write(c.Name) total += int64(n) if err != nil { return total, err } //遍历topic for _, param := range c.Params { //先写入空格 n, err := w.Write(byteSpace) total += int64(n) if err != nil { return total, err } //写入topic n, err = w.Write(param) total += int64(n) if err != nil { return total, err } } //写入 \n n, err = w.Write(byteNewLine) total += int64(n) if err != nil { return total, err } //写入具体内容 if c.Body != nil { bufs := buf[:] binary.BigEndian.PutUint32(bufs, uint32(len(c.Body))) //吸入长度 n, err := w.Write(bufs) total += int64(n) if err != nil { return total, err } //写入内容 n, err = w.Write(c.Body) total += int64(n) if err != nil { return total, err } } //返回总长度 return total, nil }