rabbitMq入门

    技术2022-07-15  53

    1.安装

    RabbitMq是基于Erlang开发的,在安装RabbitMQ之前,需要安装Erlang环境

    Erlang下载地址:http://www.erlang.org/downloads

    安装一路Next即可,注意:Erlang需要在Administrator环境下安装, 否则给RabbitMQ的注册key将不会显示

    安装完成后,需要在计算机上配置环境变量-系统变量

    ERLANG_HOME=C:\Program Files\erl10.2 PATH=%ERLANG_HOME%\bin;

    检测 配置 是否成功

    RabbitMQ下载地址:https://www.rabbitmq.com/install-windows.html#chocolatey

    下载安装,一路Next即可,完成之后配置RabbitMQ的系统变量

    PATH=(你的安装目录)\rabbitmq_server-3.7.12\sbin;

    检测 配置 是否成功

     

    2.rabbitmq基本命令

    # 服务命令启动 $ net start rabbitmq $ net stop rabbitmq ​ # rabbitmq 命令 $ rabbitmq-service start $ rabbitmqctl stop $ rabbitmqctl status

     

    3.开启管理页面【监控页面】

    rabbitmq-plugins enable rabbitmq_management

    提示以下信息内容,访问 http://localhost:15672 登录rabbitmq的控制页面

    配置监控

    默认端口 15672 默认帐号密码 guest/guest 如果想使用guest/guest通过远程机器访问, 需要在rabbitmq配置文件中(/etc/rabbitmq/rabbitmq.config)中设置loopback_users为[] 修改,以便在外网访问 [{rabbitmq_management, [{listener, [{port, 15672}, {ip, "0.0.0.0"} ]} ]} ]. 添加用户 rabbitmqctl add_user root root123 设置root用户为所有权限 rabbitmqctl set_permissions -p / root ".*" ".*" ".*" 设置root用户为管理员 rabbitmqctl set_user_tags root administrator

     

    概念

    Producer:生产者 Consumer:消费者 Broker:消息中间件 的 服务节点,对于RabbitMQ来说 RabbitMQ Broker相当于一台RabbitMQ服务器 Exchange: 交换器,由交换器将信息路由到一个或者多个队列中 RabbitMQ的交换器由四种; fanout\direct\topic\headers fanout:会把消息发送给所有与此交换机绑定的队列中 direct: direct类型的交换器会把消息路由到bindingkey和routingkey完全匹配的队列中 topic:topic与direct匹配规则类似,但支持表达式类型 topic模式下,BindingKey 和 RoutingKey 用【.】号分割,例如:com.sixgod.test BindingKey中存在两种特殊字符串【*】 和 【#】,用作 模糊匹配 *:用于匹配一个单词,如:*.*.test 可以 匹配到 com.sixgod.test #:用于匹配多个单词,如:#.test 可以 匹配到 com.sixgod.test headers:根据发送的消息内容中的Headers属性进行匹配【键值对形式】 headers类型的交换器性能很差、不实用,已经很少看到它的存在 RoutingKey:路由键,用于指定消息被路由中规则,和BindingKey绑定键联合使用才有效 Queue: 队列,用于存储消息 Binding:绑定,RabbitMQ中用于将 交换器 和 队列 关联 起来 BindingKey:BindingKey也属于路由键的一种,在绑定的时候使用的路由键 Connection:tcp连接,通道会复用tcp连接 Connection可以完成Channel的工作,但在多线程环境下,Connection(TCP)的创建销毁是非常昂贵的开销,故RabbitMQ采用类似NIO的做法,将TCP连接复用 Channel:AMQP信道,每个信道有唯一id,RabbitMQ处理每条AMQP指令都是通过信道完成的 Channel不能在多个线程中共享

    创建RabbitMQ连接的方式

    代码清单1: ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); featory.setVirtualHost(VIRTUALHOST); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection conn = factory.newConnection(); 代码清单2: ConnectionFactory factory = new ConnectionFactory(); factory.setUri("amqp://USERNAME:PASSWORD@IP_ADDRESS:PORT/VIRTUALHOST"); Connection conn = factory.newConnection();

    创建交换器

    Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); // 创建交换器 channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments); exchange: 交换器名称 type:创建的交换器类型 durable:是否持久化。持久化会将交换器存盘,在服务器重启的时候不会丢失相关数据 autoDelete:是否自动删除 internal:设置是否内置【内置交换器】,设置为true时无法直接发送信息到这个交换器中,只能通过交换器路 由到交换器这种方式 argument;结构化参数,比如alternate-exchange ***** 其他重载exchangeDeclare方法都是通过缺省传参上面这个方法来实现 *****

    创建队列

    ConnectionFactory factory = new ConnectionFactory() Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); // 创建队列只有两种方式 channel.queueDeclare(); channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments); queue:队列名称 durable:是否持久化 exclusive:是否排他 如果一个队列声明为 排他队列,仅对首次声明它的连接可见,并在连接断开时自动删除 autoDelete:是否自动删除 arguments:设置队列的其他参数,具体自行百度

    队列绑定

    ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); // 创建一个type='direct'、持久化、非自动删除的交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null); // 创建一个持久化、非排他、非自动删除的队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 队列绑定 channel.queueBind(String queueName,String exchangeName,String routingKey); channel.queueBind(String queueName,String exchangeName,String routingKey,Map<String,Object> arguments); channel.queueBindNoWait(String queueName,String exchangeName,String routingKey,Map<String,Object> arguments); // 队列解绑 ..... queueName:队列名称 exchangeName:交换器名称 routingKey:路由键 arguments:定义绑定的一些参数

    交换器与交换器绑定

    RabbitMQ消费模式

    推(Push) 采用Basic.Consume进行消费 - 持续订阅方式消费信息【接收大量信息情况下】 boolean autoAck = false; channel.basicConsume(queueName,autoAck,consumerTag, new DefaultCOnsumer(channel){ @Verride public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){ // do something channel.basicAck(envelope.getDeliveryTag(),false), } }) queueName:队列名称 autoAck: 设置是否自动确认,建议设置成false 技巧:上方显示ack操作,可以防止消息不必要的丢失 consumerTag:消费者标签 >>>>>有很多重载方法,需要自行了解下 拉(Pull) 采用Basic.Get进行消费 -单条获取信息 GetResponse response = channel.basicGet(queueName,false); String information = new String(response.getBody()); channel.basicAck(response.getEnvelope().getDeliveryTag(),false); >>>不能把Basic.Get放在循环中代替Basic.Consume,会严重影响RabbitMQ的性能

    关闭连接

    在应用程序使用完之后,需要关闭连接,释放资源 channel.close(); // => Channel.close(); conn.close(); // => Connection.close();

    基础demo示例

    生产者

    import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitProducer { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_demo"; // 路由的键名 private static final String ROUTING_KEY = "routing_demo"; // 队列名称 private static final String QUEUE_NAME = "queue_demo"; // ip private static final String IP_ADDRESS = "172.19.36.49"; // rabbit端口,默认5672 private static final int PORT = 5672; private static final String USERNAME = "root"; private static final String PASSWORD = "root123"; private static ConnectionFactory factory = null; static{ factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); } public static final void sayHello() throws IOException, TimeoutException { // 创建连接 Connection conn = factory.newConnection(); // 创建 信道/ 通道 Channel channel = conn.createChannel(); // 创建一个type='direct'、持久化、非自动删除的交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null); // 创建一个持久化、非排他、非自动删除的队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 将交换机 与 队列 绑定 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY); String message = "HelloWorld"; // 发送一条持久化的消息 // MessageProperties.PERSISTENT_TEXT_PLAIN 2 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 关闭资源 channel.close(); conn.close(); } }

    消费者

    import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class RabbitConsumer { // 交换机名称 private static final String EXCHANGE_NAME = "exchange_demo"; // 路由的键名 private static final String ROUTING_KEY = "routing_demo"; // 队列名称 private static final String QUEUE_NAME = "queue_demo"; // ip private static final String IP_ADDRESS = "172.19.36.49"; // rabbit端口,默认5672 private static final int PORT = 5672; private static final String USERNAME = "root"; private static final String PASSWORD = "root123"; private static ConnectionFactory factory = null; static{ factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); } public final static void get() throws IOException, TimeoutException, InterruptedException { // 创建连接 Connection conn = factory.newConnection(); // 创建 信道/ 通道 Channel channel = conn.createChannel(); // 设置客户端最多接收未被ask的消息个数 channel.basicQos(64); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){ // dosomething System.out.println("get context:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME,consumer); // 等待回调函数执行完毕之后关闭资源 TimeUnit.SECONDS.sleep(5); channel.close(); conn.close(); } }

     

    Processed: 0.012, SQL: 9