RabbitMQ 工作模式实例

    技术2023-07-17  64

    一个生产者,多个消费者,每个消费者获取到的消息唯一。 

    rabbitmq服务封装

    package rabbitmq import ( "fmt" "github.com/pkg/errors" "github.com/streadway/amqp" ) /*************************RabbitMQ初始化start**********************/ // 除了simple 模式外、其他的模式都是由 队列 交换机 key 不同组合实现的 type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel QueueName string //队列 Exchange string //交换机 Key string //key MQUrl string //连接信息 } // 创建RabbitMQ 实例 func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) { rabbitMQ := &RabbitMQ{ QueueName: queueName, Exchange: exchange, Key: key, MQUrl: "amqp://user:password@127.0.0.1:5672/", //Virtual host用的哪个库 } var err error // dial mq rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl) rabbitMQ.failOnErr(err, "创建连接错误") if err != nil { return nil, errors.Wrap(err, "create connect error") } // get channel rabbitMQ.channel, err = rabbitMQ.conn.Channel() if err != nil { return nil, errors.Wrap(err, "get channel err") } return rabbitMQ, nil } // 错误处理 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { panic(fmt.Sprintf("%s:%s", err.Error(), message)) } } /****************RabbitMQ初始化end**********************************/ /************Simple和work 模式 start******************/ // Step 1. Simple 创建实例 func NewRabbitMQSimple(queueName string) (*RabbitMQ, error) { //在simple模式下 exchange and key 都为空 rabbitMQ, err := newRabbitMQ(queueName, "", "") if err != nil { return nil, err } return rabbitMQ, nil } // Step 2. Simple producer code func (r *RabbitMQ) PublishSimple(message string) error { // 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建 // 保证队列存在、 消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, //是否持久化 true, // 是否自动删除 false, // 是否具有排他性 false, //是否阻塞 false, // 额外属性 nil, ) if err != nil { return errors.Wrap(err, "QueueDeclare err") } // 2.2 发送消息到队列中 err = r.channel.Publish( r.Exchange, r.QueueName, // 如果为true 根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者 false, // 如果为true 当exchange发送消息 到队列后发现队列上没有绑定消费者, 则会把消息还给 发送者 false, amqp.Publishing{ // 消息持久化 DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(message), }, ) if err != nil { return errors.Wrap(err, "Publish err") } return nil } // Step 3. Simple consumer code func (r *RabbitMQ) ConsumeSimple() { // 3.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建 // 保证队列存在、 消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, //是否持久化 true, // 是否自动删除 false, // 是否具有排他性 false, //是否阻塞 false, // 额外属性 nil, ) if err != nil { fmt.Println("QueueDeclare err: ", err) } message, err := r.channel.Consume( r.QueueName, // 用来区分多个消费者 "", // 是否自动应答 (消费完了后 是否自动告诉rabbitmq服务 消费完了 默认为true) true, // 是否具有排他性 (true 就是 创建了自己可见的队列) false, // 如果为true 表示 不能将同一个 connection 中发送的消息传递给这个connection中的消费者 (默认为false) false, // 是否阻塞 false为不阻塞 (阻塞就是 这个消费完 下个再进来、我们系统中是需要的) true, // 其他参数 nil, ) if err != nil { fmt.Println("Consume err: ", err) } //不让协程终止 forever := make(chan bool) // 启用 goroutine 处理消息 go func() { for d := range message { //接收消息进行逻辑处理 fmt.Printf("Received a message: %s \n", d.Body) } }() // 在没有消息处理后 进行阻塞 <-forever //不让协程终止 }

    消费者代码1

    package main import ( "MqTest/rabbitmq" "fmt" ) func main() { queueName := "Simple" rabbitMQ, err := rabbitmq.NewRabbitMQSimple(queueName) if err != nil { fmt.Println(err) } rabbitMQ.ConsumeSimple() }

    消费者代码2

    package main import ( "MqTest/rabbitmq" "fmt" ) func main() { queueName := "Simple" rabbitMQ, err := rabbitmq.NewRabbitMQSimple(queueName) if err != nil { fmt.Println(err) } rabbitMQ.ConsumeSimple() }

    生产者代码

    package main import ( "MqTest/rabbitmq" "fmt" "strconv" ) func main() { exchangeName := "exchangeName" rabbitMQ, err := rabbitmq.NewRabbitMQSimple(exchangeName) if err != nil { fmt.Println(err) } for i:=0; i<10;i++ { data := "第"+strconv.Itoa(i)+"次学习mq" rabbitMQ.PublishSimple(data) } fmt.Println("发送成功") }

     

    Processed: 0.009, SQL: 9