代码仓库
官方文档
sarama提供了纯Go编写的kafka客户端,最新发行版本支持Go和Kafka的最新两个版本
sarama提供异步生产者和同步生产者
异步生产者用管道接受信息,并且在背后异步地生产消息。异步生产者会尽可能的提高效率,是sarama推荐的选择
同步生产者提供一个阻塞的方法,直到消息被确认成功生产。显然效率上肯定不如异步生产者,且在某些配置中,确认的消息仍会丢失
使用AsyncProducer
package main import ( "github.com/Shopify/sarama" "log" "os" "os/signal" "sync" ) func main() { config := sarama.NewConfig() // 启用发送成功配置 config.Producer.Return.Successes = true // 启动一个异步生产者 producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } log.SetOutput(os.Stdout) //接收终端的中断命令(ctrl C)来正常退出 signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) var ( wg sync.WaitGroup enqueued, successes, errors int ) // 开启Goroutine读取成功管道内的消息 wg.Add(1) go func() { defer wg.Done() for range producer.Successes() { successes++ } }() // 开启Goroutine读取错误管道内的消息 wg.Add(1) go func() { defer wg.Done() for err := range producer.Errors() { log.Println(err) errors++ } }() ProducerLoop: for { // 消息指针 message := &sarama.ProducerMessage{Topic: "topic1", Value: sarama.StringEncoder("hello sarama kafka")} select { // 发送消息 case producer.Input() <- message: enqueued++ // 正常退出 case <-signals: producer.AsyncClose() break ProducerLoop } } //等待两个管道内的消息处理完 wg.Wait() log.Printf("Successfully produced: %d; errors: %d; produced: %d\n", successes, errors, enqueued) }使用Consumer对象
package main import ( "github.com/Shopify/sarama" "log" "os" "os/signal" ) func main() { // 创建消费者对象 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { panic(err) } // 程序运行结束时,调用Close关闭消费者对象 defer func() { if err := consumer.Close(); err != nil { log.Fatalln(err) } }() // 创建消费者对象管理的分区消费者对象 partitionConsumer, err := consumer.ConsumePartition("topic1", 0, sarama.OffsetNewest) if err != nil { panic(err) } // 程序运行结束时,调用Close关闭分区消费者对象 defer func() { if err := partitionConsumer.Close(); err != nil { log.Println(err) } }() //用系统中断信号作为结束程序的信号 signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) consumed := 0 ConsumerLoop: for { select { // 消费者从分区中拿出数据消费 case msg := <-partitionConsumer.Messages(): log.Printf("Consumed message offset %d\n", msg.Offset) consumed++ // 收到中断信号结束程序 case <-signals: break ConsumerLoop } } }