使用JAVA获取ActiveMQ队列数据和状态

    技术2025-03-16  28

    1、向ActiveMQ中放入消息

    import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.BytesMessage; import javax.jms.BytesMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Putmsg { // tcp 地址, tcp://localhost:61616 private String url; private String user; private String pwd; //目标,队列或Topic名称 private String qName; Session session = null; MessageProducer producer = null; //目标,TOPIC相关 TopicSession tsession = null; TopicPublisher publisher = null; /** * * @param url * @param user * @param pwd * @param qName */ public Putmsg(String url, String user, String pwd, String qName){ this.url = url; this.user = user; this.pwd = pwd; this.qName = qName; } /** * <b>function:</b> 发送消息 * @param session * @param producer * @throws Exception */ public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 0; i < 5; i++) { String message = "发送消息第" + (i + 1) + "条"; BytesMessage text = session.createBytesMessage(); System.out.println(message); producer.send(text); } } /** * 将指定数据放入到AMQ中 * @param destPath 目录下所有文本,放入到AMQ中 * @throws Exception */ public void sendMsg4Path(String destPath){ try { File direct=new File(destPath); File[] tempList = direct.listFiles(); System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length); int count = 0; for (int i = 0; i < tempList.length; i++) { if (tempList[i].isFile()) { try { //遍历文件并生成对应的字节码文件到目录中 File file = new File(tempList[i].getAbsolutePath()); //可以换成工程目录下的其他文本文件 FileInputStream fis= new FileInputStream(file); //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类; ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); // ByteArrayOutputStream 是OutputStream的一个实现类 int ch = 0; //byte[] msg = null; while (true) { //取得文本对应的16进制数据 ch = fis.read(); if(ch==-1) break; //将FileInputStream 的内容写到 ByteArrayOutputStream 中 bytestream.write(ch); } bytestream.close(); //关闭文件 fis.close(); byte imgdata[] = bytestream.toByteArray(); BytesMessage text = session.createBytesMessage(); text.writeBytes(imgdata); producer.send(text); //TODO setReadOnlyBody(true),输出其长度 text.reset(); System.out.println("len = " + text.getBodyLength()); count = i + 1; // System.out.println("Put the " + count +" file into the MQ! " + tempList[i]); } catch (Exception e) { e.printStackTrace(); } }//判断是否为文件 }//在指定目录下循环取文件 System.out.println("Put "+ count +" files all fininshed!"); } catch (Exception e) { e.printStackTrace(); } } /** * 将指定数据放入到AMQ中 * @param destPath 目录 * @param fileName 文件名,放入到AMQ中的内容 * @throws Exception */ public void sendMsg4File(String destPath, String fileName){ try { File direct=new File(destPath); File[] tempList = direct.listFiles(); System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length); int count = 0; for (int i = 0; i < tempList.length; i++) { if (tempList[i].isFile() && tempList[i].getName().contains(fileName)) { try { //遍历文件并生成对应的字节码文件到目录中 File file = new File(tempList[i].getAbsolutePath()); //可以换成工程目录下的其他文本文件 FileInputStream fis= new FileInputStream(file); //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类; ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); // ByteArrayOutputStream 是OutputStream的一个实现类 int ch = 0; //byte[] msg = null; while (true) { //取得文本对应的16进制数据 ch = fis.read(); if(ch==-1) break; //将FileInputStream 的内容写到 ByteArrayOutputStream 中 bytestream.write(ch); } byte imgdata[] = bytestream.toByteArray(); bytestream.close(); fis.close(); BytesMessage text = session.createBytesMessage(); text.writeBytes(imgdata); producer.send(text); count +=1; // System.out.println("Put the " + count +" file into the MQ! " + tempList[i]); } catch (Exception e) { e.printStackTrace(); } }//判断是否为文件 }//在指定目录下循环取文件 System.out.println("Put "+ count +" files all fininshed!"); } catch (Exception e) { e.printStackTrace(); } } /** * 将指定数据放入到AMQ中 * @param destPath 目录,或放入到AMQ中的内容 * 当传入的参数既不是目录也不是文件,就把该参数放入到AMQ中 * @throws Exception */ public void sendMsg4Str(String msg){ BytesMessage message; try { message = session.createBytesMessage(); byte[] bmsg = msg.getBytes(); message.writeBytes(bmsg); System.out.println(msg); producer.send(message); // TODO message.reset(); System.out.println("len = " + message.getBodyLength()); } catch (JMSException e) { e.printStackTrace(); } } public void putmsg2amq() throws Exception { Connection connection = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue(qName); // 创建消息生产者 producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, producer); } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } /** * 数据同步,将具体报文内容发送到AMQ * @param msg * @throws Exception */ public void putmsg2amq(String msg) throws Exception { Connection connection = null; // Session session = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue(qName); // 创建消息生产者 producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMsg4Str(msg); } catch (Exception e) { throw e; } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } /** * 将指定目录下所有文件中的内容,发送到AMQ * @param msg * @throws Exception */ public void putmsg2amqPath(String path) { Connection connection = null; try{ try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue(qName); // 创建消息生产者 producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMsg4Path(path); } catch (Exception e) { System.out.println(e.getMessage()); } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } catch (Exception e) { System.out.println(e.getMessage()); } } /** * 将指定目录下,指定文件的内容发送到AMQ * @param msg * @throws Exception */ public void putmsg2amqFile(String path, String file) { Connection connection = null; try{ try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue(qName); // 创建消息制作者 producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMsg4File(path, file); } catch (Exception e) { System.out.println(e.getMessage()); } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } catch (Exception e) { System.out.println(e.getMessage()); } } /** * * @param msg * @throws Exception */ public void putmsg2amqPath(String destPath, String filename) { Connection connection = null; try{ try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue(qName); // 创建消息制作者 producer = session.createProducer(destination); // 设置持久化模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); try { File direct=new File(destPath); File[] tempList = direct.listFiles(); System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length); int count = 0; for (int i = 0; i < tempList.length; i++) { if (tempList[i].isFile()) { try { //遍历文件并生成对应的字节码文件到目录中 File file = new File(tempList[i].getAbsolutePath()); //可以换成工程目录下的其他文本文件 FileInputStream fis= new FileInputStream(file); //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类; ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); // ByteArrayOutputStream 是OutputStream的一个实现类 int ch = 0; while (true) { //取得文本对应的16进制数据 ch = fis.read(); if(ch==-1) break; //将FileInputStream 的内容写到 ByteArrayOutputStream 中 bytestream.write(ch); } bytestream.close(); //关闭文件 fis.close(); byte imgdata[] = bytestream.toByteArray(); BytesMessage text = session.createBytesMessage(); text.writeBytes(imgdata); producer.send(text); count = i + 1; // System.out.println("Put the " + count +" file into the MQ! " + tempList[i]); } catch (Exception e) { e.printStackTrace(); } }//判断是否为文件 }//在指定目录下循环取文件 System.out.println("Put "+ count +" files all fininshed!"); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { System.out.println(e.getMessage()); } finally { // 关闭释放资源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } catch (Exception e) { System.out.println(e.getMessage()); } } /** * 向TOPIC中放入消息 * @throws Exception */ public void putmsg2Topic(String path){ TopicConnection connection = null; try{ try { // 创建链接工厂 TopicConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createTopicConnection(); // 启动连接 connection.start(); // 创建一个session会话 tsession = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Topic topic = tsession.createTopic(qName); // 创建消息发送者 publisher = tsession.createPublisher(topic); // 设置持久化模式 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(tsession, publisher); } catch (Exception e) { System.out.println(e.getMessage()); } finally { // 关闭释放资源 if (tsession != null) { tsession.close(); } if (connection != null) { connection.close(); } } }catch (Exception e) { System.out.println(e.getMessage()); } } /** * 将指定数据放入到AMQ的TOPIC中 * @param destPath 目录下所有文本,放入到AMQ TOPIC中 * @throws Exception */ public void sendMsgTopic4Path(String destPath){ try { File direct=new File(destPath); File[] tempList = direct.listFiles(); System.out.println("该目录下需要放入到MQ的文件个数:"+tempList.length); int count = 0; for (int i = 0; i < tempList.length; i++) { if (tempList[i].isFile()) { try { //遍历文件并生成对应的字节码文件到目录中 File file = new File(tempList[i].getAbsolutePath()); //可以换成工程目录下的其他文本文件 FileInputStream fis= new FileInputStream(file); //获得InputStream,因为FileInputStream 是InputStream的实现类;InputStream是个抽象类; ByteArrayOutputStream bytestream = new ByteArrayOutputStream(); // ByteArrayOutputStream 是OutputStream的一个实现类 int ch = 0; //byte[] msg = null; while (true) { //取得文本对应的16进制数据 ch = fis.read(); if(ch==-1) break; //将FileInputStream 的内容写到 ByteArrayOutputStream 中 bytestream.write(ch); } bytestream.close(); //关闭文件 fis.close(); byte imgdata[] = bytestream.toByteArray(); BytesMessage text = tsession.createBytesMessage(); text.writeBytes(imgdata); publisher.send(text); count = i + 1; // System.out.println("Put the " + count +" file into the MQ! " + tempList[i]); } catch (Exception e) { e.printStackTrace(); } }//判断是否为文件 }//在指定目录下循环取文件 System.out.println("Put "+ count +" files all fininshed!"); } catch (Exception e) { e.printStackTrace(); } } }

    2、从ActiveMQ中取出消息

    import java.io.BufferedOutputStream; import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; /** * <b>function:</b> 消息接收者 */ public class Getmsg { private String url; private String user; private String pwd; //目标,队列或Topic名称 private String qName; Session session = null; MessageProducer producer = null; //目标,TOPIC相关 TopicSession tsession = null; TopicPublisher publisher = null; /** * * @param url * @param user * @param pwd * @param qName */ public Getmsg(String url, String user, String pwd, String qName){ this.url = url; this.user = user; this.pwd = pwd; this.qName = qName; } public BytesMessage getmsg() { BytesMessage text = null; Connection connection = null; try { // 创建链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( user, pwd, url); // 通过工厂创建一个连接 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建一个session会话 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue(qName); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); // 接收数据的时间(等待) 100 ms Message message = consumer.receive(100); text = (BytesMessage) message; } catch (Exception e) { e.getStackTrace(); } finally { // 关闭释放资源 if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } return text; } public static void main(String[] args) throws Exception { //从AMQ队列取得数据并存入文件中 Getmsg g = new Getmsg("tcp://localhost:61616","amq", "123456", "testmq"); BytesMessage bm = g.getmsg(); int msgLenth = (int)bm.getBodyLength(); byte[] bmArr = new byte[msgLenth]; bm.readBytes(bmArr); File file = new File("D:/test.txt"); FileOutputStream fos = new FileOutputStream(file); BufferedOutputStream bs = new BufferedOutputStream(fos); bs.write(bmArr); bs.close(); fos.close(); } } import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import javax.management.MBeanServerConnection; import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; import javax.management.remote.JMXServiceURL; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; /** * <b>function:</b> 消息接收者 */ public class Jmx4Amq { private String uri; private String user; private String pwd; //目标,队列或Topic名称 private String qName; private BrokerViewMBean mBean = null; private MBeanServerConnection connection = null; private JMXConnector connector = null; /** * * @param url * @param user * @param pwd * @param qName */ public Jmx4Amq(String uri, String user, String pwd){ this.uri = uri; this.user = user; this.pwd = pwd; } /** * 对JMX连接中的对象进行初始化 */ public void getStatus(){ try { HashMap<String, Object> prop = new HashMap<String, Object>(); //jmx.password String[] au = {user,pwd}; prop.put(JMXConnector.CREDENTIALS, au); JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"+ uri+"/jmxrmi"); connector = JMXConnectorFactory.connect(url, prop); connector.connect(); connection = connector.getMBeanServerConnection(); // 需要注意的是,这里的jms-broker必须和上面配置的名称相同 ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=Broker_Name"); mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true); for(ObjectName queueName : mBean.getQueues()) { QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler. newProxyInstance(connection, queueName, QueueViewMBean.class, true); // 消息队列名称 System.out.println(queueMBean.getName()); // 队列中剩余的消息数 System.out.println(queueMBean.getQueueSize()); // 消费者数 System.out.println(queueMBean.getConsumerCount()); // 入队数 System.out.println(queueMBean.getEnqueueCount()); // 出队数 System.out.println(queueMBean.getDequeueCount()); } }catch(Exception e){ e.printStackTrace(); } } /** * 清空队列中的数据 */ public void clearMsg(){ try { //遍历AMQ中的对象 for(ObjectName queueName : mBean.getQueues()) { QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler .newProxyInstance(connection, queueName, QueueViewMBean.class, true); //找到匹配队列,执行purge操作 if(queueMBean.getName().equals(qName)){ queueMBean.purge(); break; } } }catch(Exception e){ e.printStackTrace(); } } /** * 关闭JMX连接 */ public void closeJmxConn(){ try { connector.close(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //从AMQ取得数据 Jmx4Amq g = new Jmx4Amq("localhost:11099","admin","cacikf88"); g.getStatus(); } }

     

    Processed: 0.009, SQL: 9