package com.model; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Hello world! * */ public class JmsProducer { public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616"; public static final String QUEUE_NAME="queue01"; public static void main(String[] args) throws JMSException { // 1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2.创建连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3.创建会话 // 两个参数,第一交事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); // 5.创建消息的生产者 MessageProducer messageProducer = session.createProducer(queue); // 6.通过使用messageProducer生产3条消息发送给MQ的队列里面 for (int i = 0; i < 3; i++) { // 7.创建消息,好比时学生按照要求给来老师写的问题 TextMessage textMessage = session.createTextMessage("message----" + i); // 8.通过messageProducer发送给mq messageProducer.send(textMessage); } // 9.关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("生产者发送消息******"); } }
消费者编码实现 receive()方法:
package com.model; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsConsumer { public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616"; public static final String QUEUE_NAME="queue01"; public static void main(String[] args) throws JMSException { // 1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2.创建连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3.创建会话 // 两个参数,第一交事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); // 5.创建消息的消费者 MessageConsumer consumer = session.createConsumer(queue); // 6.消费者消费信息 while(true){ TextMessage message = (TextMessage) consumer.receive(); if (message!=null){ System.out.println("message==="+message.getText()); }else { break; } } // 7.关闭资源 consumer.close(); session.close(); connection.close(); } }
消费者实现的第二种方式(消息的监听器) :
监听模式,有Sysem.in.read()方法的时候不会关闭,没有的时候会关闭
package com.model; import lombok.SneakyThrows; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class JmsListenerConsumer { public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616"; public static final String QUEUE_NAME="queue01"; public static void main(String[] args) throws JMSException, IOException { // 1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2.创建连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3.创建会话 // 两个参数,第一交事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建目的地(具体是队列还是主题topic) Queue queue = session.createQueue(QUEUE_NAME); // 5.创建消息的消费者 MessageConsumer consumer = session.createConsumer(queue); // 6.消费者消费信息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("****消费者接受消息" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); // 7.关闭资源 System.in.read(); consumer.close(); session.close(); connection.close(); } }
通过监听的方式来消息消息Mes sageConsumer messageConsumer = session. createConsumer(queue);
异步非阻塞方式(监听器onMessage())
订阅者或接收者通过MessageConsumer的setMessageL is tener(MessageListener listener)注册- . 个消息监听器,
当消息到达之后,系统自动调用监听器MessageL is tener的onMessage (Message message) 方法。
消费者三大消费情况
小结JMS开发步骤
两种消费方式的比较
模型比较
消费者代码实现
package com.model; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class JmsListenerConsumer_topic { public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616"; public static final String TOPIC_NAME="topic01"; public static void main(String[] args) throws JMSException, IOException { // 1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2.创建连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3.创建会话 // 两个参数,第一交事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建目的地(具体是队列还是主题topic) Topic topic = session.createTopic(TOPIC_NAME); // 5.创建消息的消费者 MessageConsumer consumer = session.createConsumer(topic); // 6.消费者消费信息 // consumer.setMessageListener(new MessageListener() { // @Override // public void onMessage(Message message) { // if (null != message && message instanceof TextMessage) { // // TextMessage textMessage = (TextMessage) message; // // try { // System.out.println("****topic**消费者接受消息" + textMessage.getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // } // }); consumer.setMessageListener(message -> { if (null != message && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("****topic**消费者接受消息" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 7.关闭资源 System.in.read(); consumer.close(); session.close(); connection.close(); } }
topic生产者代码实现
package com.model; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Hello world! * */ public class JmsProducer_topic { public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616"; public static final String TOPIC_NAME="topic01"; public static void main(String[] args) throws JMSException { // 1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 2.创建连接工厂,获得连接connection并启动访问 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); // 3.创建会话 // 两个参数,第一交事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 4.创建目的地(具体是队列还是主题topic) Topic topic = session.createTopic(TOPIC_NAME); // 5.创建消息的生产者 MessageProducer messageProducer = session.createProducer(topic); // 6.通过使用messageProducer生产3条消息发送给MQ的队列里面 for (int i = 0; i < 3; i++) { // 7.创建消息,好比时学生按照要求给来老师写的问题 TextMessage textMessage = session.createTextMessage("topic***message----" + i); // 8.通过messageProducer发送给mq messageProducer.send(textMessage); } // 9.关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("topic***生产者发送消息******"); } }
原文:https://www.cnblogs.com/zzhAylm/p/14813967.html