RabbitMQ常见场景的研究

    技术2022-07-10  140

    1.消息不丢失

    (1)生产者弄丢数据

    ①使用事务(性能差)

    可以选择用rabbitmq提供的事务功能,在生产者发送数据之前开启rabbitmq事务(channel.txSelect),然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,开始rabbitmq事务机制,基本上吞吐量会下来,因为太耗性能。

    ②发送回执确认(推荐)

    可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。   事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。

    所以一般在生产者这块避免数据丢失,都是用confirm机制的。

    import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); //消息发送到交换器Exchange后触发回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("ConfirmCallback: "+"相关数据:"+correlationData); System.out.println("ConfirmCallback: "+"确认情况:"+ack); System.out.println("ConfirmCallback: "+"原因:"+cause); } }); //消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发) rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("ReturnCallback: "+"消息:"+message); System.out.println("ReturnCallback: "+"回应码:"+replyCode); System.out.println("ReturnCallback: "+"回应信息:"+replyText); System.out.println("ReturnCallback: "+"交换机:"+exchange); System.out.println("ReturnCallback: "+"路由键:"+routingKey); } }); return rabbitTemplate; } }

    (2)rabbitMQ弄丢数据(在spring rabbit 2.2.5中查看源码,Exchange、queue和Message默认都是持久化的)

    为了防止rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

    设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

    而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

    若生产者那边的confirm机制未开启的情况下,哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

    以上针对单节点,对于生产环境,是高可用集群模式,并不会将所有节点设置为disk模式,所以个人想法是保证2个disk节点,防止一个disk挂了,新消息将不会备份到磁盘

    Exchange、queue和Message都备份在disk上,所以全部节点宕机,需先启动disk节点,若宕机一部分,剩下的节点没有disk节点,则后面进来的Exchange、queue和Message,在全部节点宕机后将不会再留备份(高可用集群模式下)

    (3)消费端弄丢数据

    主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了比如重启了,那么就尴尬了,RabbitMQ认为你都消费了,这数据就丢了。或者消费者拿到数据之后挂了,这时候需要MQ重新指派另一个消费者去执行任务(一块肉,刚用筷子夹起来,发地震抖了一下,肉掉了)

    这个时候得用RabbitMQ提供的ack机制,也是一种处理完成发送回执确认的机制。如果MQ等待一段时间后你没有发送过来处理完成 那么RabbitMQ就认为你还没处理完,这个时候RabbitMQ会把这个消费分配给别的consumer去处理,消息是不会丢的。

    import com.xiaoll.rabbitmqconsumer.consumer.TraceReceiver; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTraceConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private TraceReceiver traceReceiver;//trace消息接收处理类 @Bean public Queue testTrace() { return new Queue("TestTrace"); } @Bean TopicExchange traceExchange() { TopicExchange topicExchange = new TopicExchange("amq.rabbitmq.trace"); topicExchange.setInternal(true); return topicExchange; } @Bean Binding bindingTrace() { return BindingBuilder.bind(testTrace()).to(traceExchange()).with("publish.#"); //监听所有收到的消息 } @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 container.setQueues(testTrace()); //设置监听日志的队列 container.setMessageListener(traceReceiver); //监听到日志收的处理方法 return container; } } import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; import java.util.Optional; @Component public class TraceReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("TestTrace消费者收到消息 : " +message.toString()); System.out.println(message.getBody()); //字符串 String str2=new String(message.getBody()); System.out.println("\n打印2:"+str2); //Object类型 ,这里是map类型 String str4=this.bytesToObject(message.getBody()).toString(); Optional<Map> map = this.bytesToObject(message.getBody()); System.out.println("\n打印3:"+str4); System.out.println("\n打印4:"+map.get().get("messageData")); channel.basicAck(deliveryTag, true); //确认成功,队列会删除该消息 } catch (Exception e) { channel.basicReject(deliveryTag, true); //确认失败,消息放回队列 e.printStackTrace(); } } /** * 数组转对象 * @param bytes * @return */ public static<T> Optional<T> bytesToObject(byte[] bytes) { T t = null; ByteArrayInputStream in = new ByteArrayInputStream(bytes); ObjectInputStream sIn; try { sIn = new ObjectInputStream(new BufferedInputStream(in)); t = (T)sIn.readObject(); } catch (Exception e) { e.printStackTrace(); } return Optional.ofNullable(t); } }

    2.消息日志存储

    (1)启用官方日志监控(每个节点都要启动)

    ./rabbitmq-plugins enable rabbitmq_tracing

    (2)开启日志监控(每个节点都要启动)

    rabbitmqctl trace_on -p /

    (3)自定义消息日志监控

    自定义队列绑定到插件的队列上,代码处理消息即可

    绑定路由键

    所有消息:#

    所有发送的消息:publish.#

    所有消费的消息:deliver.#

    指定发送到交换器的消息:publish.{交换器名字}

    指定队列被消费的消息:deliver.{队列的名字}

    (4)写一个自定义队列连上这个topicexchage-amq.rabbitmq.trace

    import com.xiaoll.rabbitmqconsumer.consumer.TraceReceiver; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitTraceConfig { @Autowired private CachingConnectionFactory connectionFactory; @Autowired private TraceReceiver traceReceiver;//trace消息接收处理类 @Bean public Queue testTrace() { return new Queue("TestTrace"); } @Bean TopicExchange traceExchange() { TopicExchange topicExchange = new TopicExchange("amq.rabbitmq.trace"); topicExchange.setInternal(true); return topicExchange; } @Bean Binding bindingTrace() { return BindingBuilder.bind(testTrace()).to(traceExchange()).with("publish.#"); //监听所有收到的消息 } @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 container.setQueues(testTrace()); //设置监听日志的队列 container.setMessageListener(traceReceiver); //监听到日志收的处理方法 return container; } }

    (5)编写处理方法,将所有消息处理,可存库

    import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.Map; import java.util.Optional; @Component public class TraceReceiver implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("TestTrace消费者收到消息 : " +message.toString()); System.out.println(message.getBody()); //字符串 String str2=new String(message.getBody()); System.out.println("\n打印2:"+str2); //Object类型 ,这里是map类型 String str4=this.bytesToObject(message.getBody()).toString(); Optional<Map> map = this.bytesToObject(message.getBody()); System.out.println("\n打印3:"+str4); System.out.println("\n打印4:"+map.get().get("messageData")); channel.basicAck(deliveryTag, true); } catch (Exception e) { channel.basicReject(deliveryTag, true); e.printStackTrace(); } } /** * 数组转对象 * @param bytes * @return */ public static<T> Optional<T> bytesToObject(byte[] bytes) { T t = null; ByteArrayInputStream in = new ByteArrayInputStream(bytes); ObjectInputStream sIn; try { sIn = new ObjectInputStream(new BufferedInputStream(in)); t = (T)sIn.readObject(); } catch (Exception e) { e.printStackTrace(); } return Optional.ofNullable(t); } }

    (6)日志文件保存:配置日志监听,管理界面-admin-Tracing-Add a new trace(每个node都要配置),日志目录:/var/tmp/rabbitmq-tracing

    Name:自定义名字

    Format:text/json,Text格式的日志方便人类阅读,JSON的方便程序解析

    Max payload bytes:最大消息体大小

    Pattern:可配置内,#(生产消费消息同时监控), publish.#(生产的消息), deliver.#(消费的消息)

    保存完成可看到文件,打开即可查看

    (7)可在exchage中看到amq.rabbitmq.trace,Connections、Channels、Queue中看到三个(各自监听各自节点的内容)用于监听日志的连接、通道及队列,检查是否启动文件保存

    3.消息权限控制

    (1)用户角色

    ①超级管理员(administrator)

    可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

    ② 监控者(monitoring)

    可登陆管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

    ③策略制定者(policymaker)

    可登陆管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息。

    与administrator的对比,administrator能看到这些内容

    ④ 普通管理者(management)

    仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

    ⑤其他

    无法登陆管理控制台,通常就是普通的生产者和消费者。

    (2)虚拟机:virtual-host

    每个RabbitMQ 服务器都能创建若干个虚拟的消息服务器,我们称之为虚拟主机。

    每一个虚拟主机都拥有自己的交换机、队列和绑定等(命名空间的概念),每个虚拟主机之间是相互隔离的,这就意味着我们可以在不同虚拟主机之间使用相同的交换机、队列。可以理解为一个沙箱,每个沙箱相互独立互不影响。

    每个RabbitMQ 用户必须隶属于一个或多个虚拟主机,我们在连接RabbitMQ 的时候必须先指定一个虚拟主机。

    (3)资源权限控制:permissions(针对虚拟机资源做正则表达式过滤)

    1个用户对于1个虚拟机有且只有1个配置

    权限配置包括:配置(队列和交换机的创建和删除)、写(发布消息)、读(有关消息的任何操作,包括清除这个队列) conf:一个正则表达式match哪些配置资源能够被该用户访问。 write:一个正则表达式match哪些配置资源能够被该用户读。 read:一个正则表达式match哪些配置资源能够被该用户访问

    (4)主题权限控制:Topic permissions

    1个用户对于1个exchange有且仅有1个配置

    从3.7版本开始,RabbitMQ开始支持为topic交换器设置权限,在设置权限时,要结合发布到主题交换器的消息的routing key信息(例如,在RabbitMQ默认授权后端,routing key与正则表达式是否匹配,决定了消息是否可以路由到下游)。

    设置了topic授权后,向topic路由器发布消息,或者从topic路由器接收消息,都会依据消息的routing key进行权限检查,如果routing key与权限设置匹配,则成功,否则抛出异常。

    (5)制定方案

    ①.以单虚拟机为基础权限控制方案

    仅有1个虚拟机,本次采用fanout的形式推送消息,根据资源权限控制exchange、queue的读写

    例如有系统A、系统B,A发消息,B收消息

    A

    exchange格式:A.message1 A.message2

    queue格式:A.message1.A

    权限:

    Virtual:/ conifg:^A\..*$ write:^A\..*$ read:^.*\.A$

    B

    queue格式:A.message1.B(绑定A.message1)

    权限:

    Virtual:/ conifg:^B\..*$ write:^B\..*$ read:^.*\.A$

    方案优点:只需要一个虚拟机,因此只需要开启一个日志监控,便可将所有消息监听做记录(存库或者文件)

    方案缺点:通过资源的名称来控制权限,对名称要求稍严格

    ②.以多个虚拟机为基础权限控制方案

    1个系统1个虚拟机,本次采用fanout的形式推送消息,根据资源权限控制exchange、queue的读写

    例如有系统A、系统B,A发消息,B收消息

    A

    exchange格式:message1 message2

    queue格式:message1.A

    权限:

    Virtual:A conifg:.* write:.* read:.*

    B

    queue格式:message1.B(绑定A.message1)

    权限:

    Virtual:A conifg:^$ write:^$ read:^.*\.B$

    方案优点:整个虚拟机权限下放,任其定义exchange名称,各个系统可拥有相同的名称互不影响,queue名称因授权需要按规则定义

    方案缺点:每新增一个系统,需增加一个虚拟机,并需要增加一个日志监控其虚拟机,并监听该topic进行存库或存文件操作

    4.监控管理汉化

    RabbitMQ Management插件提供了 HTTP API,可对集群进行很多操作,通过HTTP API接口操作集群(根据功能需要自定义开发)

    postman测试,验证HTTP API

    1.选择对应请求类型(get、put、delete、post)

    2.Authorzation->Basic Auth->填写用户密码

    3.带数据请填写,Body->raw->填写json数据

    具体功能API列表如下,且验证如下(注意,“/”为URI特殊字符,默认虚拟机为“/”,需使用它时请将它编码为“/”)

    序号getputdeletepost接口描述1√/api/overview获取描述整个系统的各种概述信息2√√/api/cluster-nameRabbitMQ集群的名称,需修改请带入json内容类似:{“name”:“rabbit@rabbitDM”}3√/api/nodesRabbitMQ集群中的节点列表4√/api/nodes/(name)RabbitMQ集群中的单个节点信息5√/api/extensions管理插件的扩展列表6√√/api/definitions服务器定义-交换,队列,绑定,用户,虚拟主机,权限,主题权限和参数,除消息外的所有内容7√√/api/definitions/(vhost)给定虚拟主机的服务器定义-交换,队列,绑定和策略8√/api/connections所有打开的连接的列表9√/api/vhosts/(vhost)/connections特定虚拟主机中所有打开的连接的列表10√√/api/connections/(name)单个连接。删除它将关闭连接。可以选择在删除时设置“ X-Reason”标头以提供原因。11√/api/connections/(name)/channels给定连接的所有通道的列表12√/api/channels所有开放通道的列表。13√/api/vhosts/(vhost)/channels特定虚拟主机中所有打开的通道的列表。14√/api/channels/(channel)有关单个频道的详细信息。15√/api/consumers所有消费者的名单16√/api/consumers/(vhost)给定虚拟主机中所有消费者的列表17√/api/exchanges所有交换器的清单18√/api/exchanges/(vhost)给定虚拟主机中所有交换器的列表19√√√/api/exchanges/(vhost)/(name)单个交换器操作。增加交换,格式如下:{“ type”:“ direct”,“ auto_delete”:false,“ durable”:true,“internal”:false,“arguments”:{}}该type关键是强制性的; 其他键是可选的。删除交换时,可以添加查询字符串参数if-unused=true。如果交换绑定到队列或作为另一个交换的源,这将阻止删除成功。20√/api/exchanges/(vhost)/(name)/bindings/source以给定交换为源的所有绑定的列表。21√/api/exchanges/(vhost)/(name)/bindings/destination以给定交换为目的地的所有绑定的列表。22√/api/exchanges/(vhost)/(name)/publish将消息发布到给定的交易(不建议使用,效率低,使用tcp的长连接发即可)23√/api/queues所有队列的列表。24√/api/queues/(vhost)给定虚拟主机中所有队列的列表。25√√√/api/queues/(vhost)/(name)单个队列。增加队列,格式如下:{“ auto_delete”:false,“ durable”:true,“ arguments”:{},“ node”:“ rabbit @ smacmullen”}所有键都是可选的。删除队列时,可以添加查询字符串参数if-empty=true和/或if-unused=true。如果队列分别包含消息或具有使用者,则这会阻止删除成功。27√/api/queues/(vhost)/(name)/bindings给定队列上所有绑定的列表。28√/api/queues/(vhost)/(name)/contents队列的内容。删除以清除。请注意,您无法获得此。29√/api/queues/(vhost)/(name)/actions可以在队列上执行的操作。张贴以下内容:{“ action”:“ sync”}当前支持的操作是sync和cancel_sync。30√/api/queues/(vhost)/(name)/get从队列中获取消息。(这不是HTTP GET,因为它会更改队列的状态。)您应该发布一个类似于以下内容的正文:{“ count”:5,“ ackmode”:“ ack_requeue_true”,“ encoding”:“ auto”,“ truncate”:50000}count控制要获取的最大消息数。如果队列无法立即提供消息,则收到的消息可能少于此。ackmode确定是否从队列中删除消息。如果ackmode为ack_requeue_true或reject_requeue_true,则将重新排队-如果ackmode为ack_requeue_false或reject_requeue_false,则将其删除。encoding 必须是“自动”(在这种情况下,有效载荷将是有效的UTF-8,否则有效载荷将以字符串形式返回,否则将以base64编码)或“ base64”(在这种情况下,有效载荷将始终以base64编码)。如果truncate存在,则它将大于给定的大小(以字节为单位)截断消息有效负载。truncate是可选的;所有其他键都是必需的。请注意,HTTP API中的get路径用于诊断等-它不能实现可靠的传递,因此应将其视为sysadmin的工具,而不是用于消息传递的常规API。31√/api/bindings所有绑定的列表。32√/api/bindings/(vhost)给定虚拟主机中所有绑定的列表。33√√/api/bindings/(vhost)/e/(exchange)/q/(queue)交换和队列之间的所有绑定的列表。请记住,交换和队列可以绑定在一起很多次!要创建新的绑定,请发布到该URI。请求正文应该是一个JSON对象,可以选择包含两个字段routing_key(一个字符串)和arguments(一个可选参数的映射):{“ routing_key”:“ my_routing_key”,“ arguments”:{“ x-arg”:“ value”}}所有键都是可选的。响应将包含一个Location标头,告诉您新绑定的URI。34√√/api/bindings/(vhost)/e/(exchange)/q/(queue)/(props)交换和队列之间的个体绑定。URI 的props部分是绑定的“名称”,该名称由其路由键和其参数的哈希组成。props是绑定列表响应中名为“ properties_key”的字段。35√√/api/bindings/(vhost)/e/(source)/e/(destination)两个交换器之间的所有绑定的列表,类似于上面的交换机和队列之间的所有绑定的列表。要创建新的绑定,请发布到该URI。请求正文应该是一个JSON对象,可以选择包含两个字段routing_key(一个字符串)和arguments(一个可选参数的映射):{“ routing_key”:“ my_routing_key”,“ arguments”:{“ x-arg”:“ value”}}所有键都是可选的。响应将包含一个Location标头,告诉您新绑定的URI。36√√/api/bindings/(vhost)/e/(source)/e/(destination)/(props)两个交换器之间的个体绑定。类似于上面的交换和队列之间的单独绑定。37√√√/api/vhosts所有虚拟主机的列表。38√/api/vhosts/(name)单个虚拟主机。由于虚拟主机通常只有一个名称,因此在放置其中之一时不需要HTTP正文。要启用/禁用跟踪,请提供如下所示的主体:{“ tracing”:true}39√/api/vhosts/(name)/permissions给定虚拟主机的所有权限的列表。40√/api/vhosts/(name)/topic-permissions给定虚拟主机的所有主题权限的列表。41√/api/vhosts/(name)/start/(node)在节点node上启动虚拟主机名。42√/api/users所有用户的列表。43√/api/users/without-permissions无权访问任何虚拟主机的用户列表。44√/api/users/bulk-delete批量删除用户列表。请求正文必须包含列表:{“users” : [“user1”, “user2”, “user3”]}45√√√/api/users/(name)个人用户。要放置用户,您将需要一个如下所示的主体:{“password”:“secret”,“tags”:“administrator”}要么:{“ password_hash”:“ 2lmoth8l4H0DViLaK9Fxi6l9ds8 =”,“ tags”:“administrator”}该tags密钥是强制性的。无论是 password或password_hash 可以设置。如果两者均未设置,则用户将无法使用密码登录,但是可以使用其他机制,例如客户端证书。设置password_hash为,""将确保用户不能使用密码登录。tags是用户的逗号分隔标签列表。目前公认的标签administrator,monitoring和management。 password_hash必须使用此处描述的算法生成 。您还可以通过将hashing_algorithm 键添加到主体来指定要使用的哈希函数。目前公认的算法rabbit_password_hashing_sha256, rabbit_password_hashing_sha512和rabbit_password_hashing_md5。46√/api/users/(user)/permissions给定用户的所有权限的列表。47√/api/users/(user)/topic-permissions给定用户的所有主题权限的列表。48√/api/whoami当前已认证用户的详细信息。49√/api/permissions所有用户的所有权限列表。50√√√/api/permissions/(vhost)/(user)用户和虚拟主机的个人许可。要获得许可,您将需要一个如下所示的主体:{“ configure”:“。”,“ write”:“。”,“ read”:“。*”}所有键都是强制性的。51√/api/topic-permissions所有用户的所有主题权限的列表。52√√√/api/topic-permissions/(vhost)/(user)用户和虚拟主机的主题权限。要放置主题权限,您将需要一个如下所示的主体:{“ exchange”:“ amq.topic”,“ write”:“ ^ a”,“ read”:“。*”}所有键都是强制性的。53√/api/parameters所有虚拟主机作用域参数的列表。54√/api/parameters/(component)给定组件的所有虚拟主机作用域参数的列表。55√/api/parameters/(component)/(vhost)给定组件和虚拟主机的所有虚拟主机作用域参数的列表。56√√√/api/parameters/(component)/(vhost)/(name)单个虚拟主机作用域参数。要放置参数,您将需要一个如下所示的主体:{“ vhost”:“ /”,“ component”:“ federation”,“ name”:“ local_username”,“ value”:“ guest”}57√/api/global-parameters所有全局参数的列表。58√√√/api/global-parameters/(name)单个全局参数。要放置参数,您将需要一个如下所示的主体:{“ name”:“ user_vhost_mapping”,“ value”:{“ guest”:“ /”,“ rabbit”:“ warren”}}59√/api/policies所有策略的列表。60√/api/policies/(vhost)给定虚拟主机中所有策略的列表。61√√√/api/policies/(vhost)/(name)个人政策。要提出政策,您需要一个看起来像这样的主体:{“ pattern”:“ ^ amq。”,“ definition”:{“ federation-upstream-set”:“ all”},“ priority”:0,“ apply-to”:“ all”}pattern并且definition是必需的,priority并且apply-to是可选的。62√/api/operator-policies所有操作员策略替代的列表。63√/api/operator-policies/(vhost)给定虚拟主机中所有操作员策略替代的列表。64√√√/api/operator-policies/(vhost)/(name)个体运营商政策。要提出政策,您需要一个看起来像这样的主体:{“ pattern”:“ ^ amq。”,“ definition”:{“ expires”:100},“ priority”:0,“ apply-to”:“ queues”}pattern并且definition是必需的,priority并且apply-to是可选的。65√/api/aliveness-test/(vhost)声明一个测试队列,然后发布并使用一条消息。供监视工具使用。如果一切正常,将返回带有正文的HTTP状态200:{“status”:“ok”}注意:测试队列不会被删除(如果队列被反复ping,以防止队列搅动)。66√/api/healthchecks/node在当前节点中运行基本运行状况检查。检查Rabbit应用程序是否正在运行,是否可以成功列出通道和队列,并且没有警报有效。如果一切正常,将返回带有正文的HTTP状态200:{“status”:“ok”}如果失败,将返回HTTP状态200,其内容为{“status”:“failed”,“reason”:“string”}67√/api/healthchecks/node/(node)在给定的节点中运行基本的运行状况检查。检查Rabbit应用程序是否正在运行,是否返回list_channels和list_queues,并且没有引发警报。如果一切正常,将返回带有正文的HTTP状态200:{“status”:“ok”}如果失败,将返回HTTP状态200,其内容为{“status”:“failed”,“reason”:“string”}68√/api/vhost-limits列出所有虚拟主机的每个虚拟主机限制。69√/api/vhost-limits/(vhost)列出特定虚拟主机的每个虚拟主机限制。70√√/api/vhost-limits/(vhost)/(name)设置或删除的每个虚拟机限制vhost。该nameURL路径元素是指限制(的名称max-connections,max-queues)。限制是使用正文中的JSON文档设置的:{“值”:100}。请求示例:curl -4u’guest:guest’-H’content-type:application / json’-X PUT localhost:15672 / api / vhost-limits / my-vhost / max-connections -d’{“ value”:50}’
    Processed: 0.011, SQL: 9