topic订阅,默认只能订阅到订阅服务启动后的发布消息;如果需要订阅订阅服务启动前发布的消息,需要进行持久化订阅(需要之前启动过订阅服务,已经告诉broker需要持久化订阅消息),broker会保留订阅消息,直到消息被订阅接收或者过期
@Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); String brokerURL = "failover:(tcp://" + "localhost" + ":" + "61616" + ")"; ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin", "admin", brokerURL); // 唯一clientID,如果出现重复的clientID,会报连接错误 activeMQConnectionFactory.setClientID("唯一clientId"); bean.setConnectionFactory(activeMQConnectionFactory); // 设置订阅发布域为true,对应topic bean.setPubSubDomain(true); // 设置持久化订阅为true bean.setSubscriptionDurable(true); return bean; }通过修改activemq配置文件,可以将未送达的消息和过期消息放入死信队列中(ActiveMQ.DLQ),可以订阅该队列,获取已被丢弃的消息
<!-- 死信策略 --> <deadLetterStrategy> <!-- 默认所有未送达的消息都会发送到指定queue(ActiveMQ.DLQ),可以通过“deadLetterQueue"属性自定义queue,"processExpired"表示是否将过期消息放入死信队列,默认为true;”processNonPersistent“表示是否将非持久化消息放入死信队列,默认为false --> <sharedDeadLetterStrategy expiration="86400000" processExpired="false"/> </deadLetterStrategy>activemq提供有中间件监控运行过程的方法(Advisory Message),初始安装activemq运行后,默认存在有以下几个topic(本次是以topic演示,queue类似),通过监听以下几个topic消息可以详细了解运行过程:
ActiveMQ.Advisory.Connection // 建立连接、发布事件、接收事件、销毁连接触发 ActiveMQ.Advisory.Producer.Topic.#{指定topic}// 发布消息、停止发布消息触发 ActiveMQ.Advisory.Consumer.Topic.#{指定topic}// 订阅消息、停止订阅触发 ActiveMQ.Advisory.Topic// topic创建或销毁触发 ActiveMQ.Advisory.MasterBroker// 实际只有订阅通知消息才会触发参考内容: https://blog.csdn.net/KimmKing/article/details/8443679 https://blog.csdn.net/chuxin_mm/article/details/79566384