RabbitMQ 发布订阅模式实例

    技术2023-07-19  112

    订阅模式:一个生产者发送的消息会被多个消费者获取。

    发布订阅模式下创建RabbitMq实例

    发布订阅模式queueName必须为空,要传入交换机exChangeName的名称,routingkey为空

    rabbitmq服务封装

    package rabbitmq import ( "fmt" "github.com/pkg/errors" "github.com/streadway/amqp" ) /****************RabbitMQ初始化start********************/ // 除了simple 模式外、其他的模式都是由 队列 交换机 key 不同组合实现的 type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel QueueName string //队列 Exchange string //交换机 Key string //key MQUrl string //连接信息 } // 创建RabbitMQ 实例 func newRabbitMQ(queueName, exchange, key string) (*RabbitMQ, error) { rabbitMQ := &RabbitMQ{ QueueName: queueName, Exchange: exchange, Key: key, MQUrl: "amqp://user:password@127.0.0.1:5672/", //Virtual host用的哪个库 } var err error // dial mq rabbitMQ.conn, err = amqp.Dial(rabbitMQ.MQUrl) rabbitMQ.failOnErr(err, "创建连接错误") if err != nil { return nil, errors.Wrap(err, "create connect error") } // get channel rabbitMQ.channel, err = rabbitMQ.conn.Channel() if err != nil { return nil, errors.Wrap(err, "get channel err") } return rabbitMQ, nil } // 错误处理 func (r *RabbitMQ) failOnErr(err error, message string) { if err != nil { panic(fmt.Sprintf("%s:%s", err.Error(), message)) } } /**************RabbitMQ初始化end*********/ /*****************订阅模式start***************************/ //订阅模式创建RabbitMQ实例 func NewRabbitMQPubSub(exchangeName string) (*RabbitMQ, error) { //在simple模式下 exchange and key 都为空 rabbitMQ, err := newRabbitMQ("", exchangeName, "") if err != nil { return nil, err } return rabbitMQ, nil } //订阅模式生产 func (r *RabbitMQ) PublishPub(message string) error { // 2.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建 // 保证队列存在、 消息能发送到队列中 err := r.channel.ExchangeDeclare( r.Exchange, //是否持久化 "fanout", // 是否自动删除 true, // 是否具有排他性 false, //是否阻塞 false, // 额外属性 false, nil, ) if err != nil { return errors.Wrap(err, "QueueDeclare err") } // 2.2 发送消息到队列中 err = r.channel.Publish( r.Exchange, "", // 如果为true 根据exchange 类型 和 routkey规则、 如果无法找到符合条件的队列、那么会把发送完的消息返回给发送者 false, // 如果为true 当exchange发送消息 到队列后发现队列上没有绑定消费者, 则会把消息还给 发送者 false, amqp.Publishing{ // 消息持久化 DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(message), }, ) if err != nil { return errors.Wrap(err, "Publish err") } return nil } //订阅模式消费 // Step 3. func (r *RabbitMQ) PublishSub() { // 3.1 申请队列、 如果队列不存在则会自动创建、 如果存在则跳过创建 // 保证队列存在、 消息能发送到队列中 err := r.channel.ExchangeDeclare( r.Exchange, //是否持久化 "fanout", // 是否自动删除 true, // 是否具有排他性 false, //是否阻塞 false, // 额外属性 false, nil, ) if err != nil { fmt.Println("Consume err: ", err) } q, err := r.channel.QueueDeclare( "", // 用来区分多个消费者 false, // 是否自动应答 (消费完了后 是否自动告诉rabbitmq服务 消费完了 默认为true) true, // 是否具有排他性 (true 就是 创建了自己可见的队列) true, // 如果为true 表示 不能将同一个 connection 中发送的消息传递给这个connection中的消费者 (默认为false) false, // 是否阻塞 false为不阻塞 (阻塞就是 这个消费完 下个再进来、我们系统中是需要的) nil, // 其他参数 ) if err != nil { fmt.Println("Consume err: ", err) } err = r.channel.QueueBind( q.Name, "", r.Exchange, false, nil, ) message, err := r.channel.Consume( r.QueueName, // 用来区分多个消费者 "", // 是否自动应答 (消费完了后 是否自动告诉rabbitmq服务 消费完了 默认为true) true, // 是否具有排他性 (true 就是 创建了自己可见的队列) false, // 如果为true 表示 不能将同一个 connection 中发送的消息传递给这个connection中的消费者 (默认为false) false, // 是否阻塞 false为不阻塞 (阻塞就是 这个消费完 下个再进来、我们系统中是需要的) true, // 其他参数 nil, ) if err != nil { fmt.Println("Consume err: ", err) } //不让协程终止 forever := make(chan bool) // 启用 goroutine 处理消息 go func() { for d := range message { //接收消息进行逻辑处理 fmt.Printf("Received a message: %s \n", d.Body) } }() // 在没有消息处理后 进行阻塞 <-forever //不让协程终止 } /************订阅模式end********************************/

    消费者代码1

    package main import ( "MqTest/rabbitmq" "fmt" ) func main() { exchangeName := "exchangeName" rabbitMQ, err := rabbitmq.NewRabbitMQPubSub(exchangeName) if err != nil { fmt.Println(err) } if err != nil { fmt.Println(err) } rabbitMQ.PublishSub() }

    消费者代码2

    package main import ( "MqTest/rabbitmq" "fmt" ) func main() { exchangeName := "exchangeName" rabbitMQ, err := rabbitmq.NewRabbitMQPubSub(exchangeName) if err != nil { fmt.Println(err) } if err != nil { fmt.Println(err) } rabbitMQ.PublishSub() }

    生产者代码

    package main import ( "MqTest/rabbitmq" "fmt" "strconv" ) func main() { exchangeName := "exchangeName" rabbitMQ, err := rabbitmq.NewRabbitMQPubSub(exchangeName) if err != nil { fmt.Println(err) } for i:=0; i<10;i++ { data := "第"+strconv.Itoa(i)+"次学习mq" rabbitMQ.PublishPub(data) } fmt.Println("发送成功") }

     

    Processed: 0.008, SQL: 9