首页 > 编程语言 > 详细

ActiveMQ--java编写代码实现通讯

时间:2021-05-26 23:21:11      阅读:19      评论:0      收藏:0      [点我收藏+]
  1. IDEA建maven工程  
  2. POM.xml文件
    • 技术分享图片 
        
  3. JMS编码总体构架
    • 技术分享图片
    • 技术分享图片

       

       

       

  4. 粗说目的地Destination,队列和主题
    1. 技术分享图片

       

       

        
  5. 在点对点的消息传递域中,目的地被称为队列(queue)
    1. 生产者编码实现:
      • 上手案例 :常见生产者执代码
      • package com.model;
        
        import org.apache.activemq.ActiveMQConnectionFactory;
        
        import javax.jms.*;
        
        /**
         * Hello world!
         *
         */
        public class JmsProducer {
            public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616";
            public static final String QUEUE_NAME="queue01";
            public static void main(String[] args) throws JMSException {
        
        //        1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码
                ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //        2.创建连接工厂,获得连接connection并启动访问
                Connection connection = activeMQConnectionFactory.createConnection();
                connection.start();
        //        3.创建会话
        //        两个参数,第一交事务/第二个叫签收
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //        4.创建目的地(具体是队列还是主题topic)
                Queue queue = session.createQueue(QUEUE_NAME);
        //        5.创建消息的生产者
                MessageProducer messageProducer = session.createProducer(queue);
        //        6.通过使用messageProducer生产3条消息发送给MQ的队列里面
                for (int i = 0; i < 3; i++) {
        //        7.创建消息,好比时学生按照要求给来老师写的问题
                    TextMessage textMessage = session.createTextMessage("message----" + i);
        //        8.通过messageProducer发送给mq
                    messageProducer.send(textMessage);
                }
        //        9.关闭资源
                messageProducer.close();
                session.close();
                connection.close();
                System.out.println("生产者发送消息******");
            }
        }
      • 技术分享图片
      • 技术分享图片

    2. 控制说明:
      • 技术分享图片
    3.  消费者编码实现 receive()方法:

      • 代码:
      • package com.model;
        
        import org.apache.activemq.ActiveMQConnectionFactory;
        
        import javax.jms.*;
        
        public class JmsConsumer {
                public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616";
                public static final String QUEUE_NAME="queue01";
                public static void main(String[] args) throws JMSException {
        
        //        1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码
                    ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //        2.创建连接工厂,获得连接connection并启动访问
                    Connection connection = activeMQConnectionFactory.createConnection();
                    connection.start();
        //        3.创建会话
        //        两个参数,第一交事务/第二个叫签收
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //        4.创建目的地(具体是队列还是主题topic)
                    Queue queue = session.createQueue(QUEUE_NAME);
        //        5.创建消息的消费者
                    MessageConsumer consumer = session.createConsumer(queue);
        //        6.消费者消费信息
                    while(true){
                        TextMessage message = (TextMessage) consumer.receive();
                        if (message!=null){
                            System.out.println("message==="+message.getText());
                        }else {
                            break;
                        }
                    }
        //        7.关闭资源
                    consumer.close();
                    session.close();
                    connection.close();
                }
        }
      • 技术分享图片技术分享图片  

      • 消费者的消费方法: consumer.receive(时间是参数); /consumer.receive(无时间是参数);有时间参数的等待一段时间后会停止,没时间参数的会一直等待消息
      • 技术分享图片
    4.  消费者实现的第二种方式(消息的监听器) :

      1. 监听模式,有Sysem.in.read()方法的时候不会关闭,没有的时候会关闭

        package com.model;
        
        import lombok.SneakyThrows;
        import org.apache.activemq.ActiveMQConnectionFactory;
        
        import javax.jms.*;
        import java.io.IOException;
        
        public class JmsListenerConsumer {
                public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616";
                public static final String QUEUE_NAME="queue01";
                public static void main(String[] args) throws JMSException, IOException {
        
        //        1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码
                    ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //        2.创建连接工厂,获得连接connection并启动访问
                    Connection connection = activeMQConnectionFactory.createConnection();
                    connection.start();
        //        3.创建会话
        //        两个参数,第一交事务/第二个叫签收
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //        4.创建目的地(具体是队列还是主题topic)
                    Queue queue = session.createQueue(QUEUE_NAME);
        //        5.创建消息的消费者
                    MessageConsumer consumer = session.createConsumer(queue);
        //        6.消费者消费信息
                   consumer.setMessageListener(new MessageListener() {
                       @Override
                       public void onMessage(Message message) {
                           if (null != message && message instanceof TextMessage) {
        
                               TextMessage textMessage = (TextMessage) message;
        
                               try {
                                   System.out.println("****消费者接受消息" + textMessage.getText());
                               } catch (JMSException e) {
                                   e.printStackTrace();
                               }
                           }
                       }
                   });
        //        7.关闭资源
                    System.in.read();
                    consumer.close();
                    session.close();
                    connection.close();
                }
        
        
        }
      2. 通过监听的方式来消息消息Mes sageConsumer messageConsumer = session. createConsumer(queue);
        异步非阻塞方式(监听器onMessage())
        订阅者或接收者通过MessageConsumer的setMessageL is tener(MessageListener listener)注册- . 个消息监听器,
        当消息到达之后,系统自动调用监听器MessageL is tener的onMessage (Message message) 方法。

      3. 技术分享图片
    5. 消费者三大消费情况

      1. 技术分享图片

    6. 小结JMS开发步骤

      1. 技术分享图片

    7. 两种消费方式的比较

      1. 技术分享图片

    8.  模型比较

      1. 技术分享图片 
  6. 在发布订阅消息传递域中,目的地被成为主题(topic)
    1. 发布主题生产者
    2. 订阅主题消费者
    3. 先启动订阅在启动生产,不然发送的消息是废消息
    4. 控制台
      1. 技术分享图片
    5.  消费者代码实现

      1. package com.model;
        
        import org.apache.activemq.ActiveMQConnectionFactory;
        
        import javax.jms.*;
        import java.io.IOException;
        
        public class JmsListenerConsumer_topic {
                public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616";
                public static final String TOPIC_NAME="topic01";
                public static void main(String[] args) throws JMSException, IOException {
        
        //        1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码
                    ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //        2.创建连接工厂,获得连接connection并启动访问
                    Connection connection = activeMQConnectionFactory.createConnection();
                    connection.start();
        //        3.创建会话
        //        两个参数,第一交事务/第二个叫签收
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //        4.创建目的地(具体是队列还是主题topic)
                    Topic topic = session.createTopic(TOPIC_NAME);
        //        5.创建消息的消费者
                    MessageConsumer consumer = session.createConsumer(topic);
        //        6.消费者消费信息
        //           consumer.setMessageListener(new MessageListener() {
        //               @Override
        //               public void onMessage(Message message) {
        //                   if (null != message && message instanceof TextMessage) {
        //
        //                       TextMessage textMessage = (TextMessage) message;
        //
        //                       try {
        //                           System.out.println("****topic**消费者接受消息" + textMessage.getText());
        //                       } catch (JMSException e) {
        //                           e.printStackTrace();
        //                       }
        //                   }
        //               }
        //           });
                    consumer.setMessageListener(message ->  {
        
                            if (null != message && message instanceof TextMessage) {
        
                                TextMessage textMessage = (TextMessage) message;
        
                                try {
                                    System.out.println("****topic**消费者接受消息" + textMessage.getText());
                                } catch (JMSException e) {
                                    e.printStackTrace();
                                }
                            }
                        });
        //        7.关闭资源
                    System.in.read();
                    consumer.close();
                    session.close();
                    connection.close();
                }
        
        
        }
    6.  

      topic生产者代码实现

      1. package com.model;
        
        import org.apache.activemq.ActiveMQConnectionFactory;
        
        import javax.jms.*;
        
        /**
         * Hello world!
         *
         */
        public class JmsProducer_topic {
            public static final String ACTIVEMQ_URL="tcp://192.168.56.130:61616";
            public static final String TOPIC_NAME="topic01";
            public static void main(String[] args) throws JMSException {
        
        //        1.创建连接工厂,按照给定的url地址,采用默认的用户名和密码
                ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //        2.创建连接工厂,获得连接connection并启动访问
                Connection connection = activeMQConnectionFactory.createConnection();
                connection.start();
        //        3.创建会话
        //        两个参数,第一交事务/第二个叫签收
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //        4.创建目的地(具体是队列还是主题topic)
                Topic topic = session.createTopic(TOPIC_NAME);
        //        5.创建消息的生产者
                MessageProducer messageProducer = session.createProducer(topic);
        //        6.通过使用messageProducer生产3条消息发送给MQ的队列里面
                for (int i = 0; i < 3; i++) {
        //        7.创建消息,好比时学生按照要求给来老师写的问题
                    TextMessage textMessage = session.createTextMessage("topic***message----" + i);
        //        8.通过messageProducer发送给mq
                    messageProducer.send(textMessage);
                }
        //        9.关闭资源
                messageProducer.close();
                session.close();
                connection.close();
                System.out.println("topic***生产者发送消息******");
            }
        }

         

              

         

          
  7. 小总结
    1. 技术分享图片

       

       

        

ActiveMQ--java编写代码实现通讯

原文:https://www.cnblogs.com/zzhAylm/p/14813967.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!