首页 > 其他 > 详细

ActiveMQ消息队列技术Demo

时间:2019-11-27 12:21:06      阅读:68      评论:0      收藏:0      [点我收藏+]

 

 

 

一、点对点模式:

有提供者,和接收方

QueueProductor
public static void main(String[] args) throws JMSException {
        //1、创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2、获取链接
        Connection connection = connectionFactory.createConnection();
        //3、启动连接
        connection.start();
        //4、获取session  参数1是否启动事务,参数2,消息确认模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、创建队列对象
        Queue queue = session.createQueue("test-queque");
        //6、创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        //7、创建消息
        TextMessage textMessage = session.createTextMessage("欢迎光临XX学院");
        //8、发送消息
        producer.send(textMessage);
        //9、关闭资源
        producer.close();
        session.close();
        connection.close();
    }
QueueConsumer
public static void main(String[] args) throws JMSException, IOException {
            //1、创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2、获取链接
            Connection connection = connectionFactory.createConnection();
            //3、启动连接
            connection.start();
            //4、获取session  参数1是否启动事务,参数2,消息确认模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5、创建队列对象
            Queue queue = session.createQueue("test-queque");
            //6、创建消息的生产者
            MessageConsumer consumer = session.createConsumer(queue);
            //7、监听消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println("次接收到消息:"+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                }
            });
            //8、等待键盘输入
            System.in.read();
            //9、关闭资源
            consumer.close();
            session.close();
            connection.close();
        }

二、发布与订阅

一个提供者,多个消费者

TopicProducer
public static void main(String[] args) throws JMSException {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");
        MessageProducer producer = session.createProducer(topic);
        TextMessage textMessage = session.createTextMessage("消息队列广播");
        producer.send(textMessage);

        producer.close();
        session.close();
        connection.close();

    }
TopicConsumer
public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");

        MessageConsumer consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("2接收到消息"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
TopicConsumer2
public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");


        MessageConsumer consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("1接收到消息"+textMessage.getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }

 

 

注意:

  点对点时,提供者和消费者谁先开启无所谓,但是在消息发布与订阅中,消费者必须要在提供者启动之前,启动完成,并监听。得到消息队列中的数据

 

 

 

  

ActiveMQ消息队列技术Demo

原文:https://www.cnblogs.com/guanyuehao0107/p/11940693.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!