首页 > 编程语言 > 详细

ActiveMQ_Java实现生产者和消费者操作队列

时间:2020-09-20 20:34:11      阅读:117      评论:0      收藏:0      [点我收藏+]

一、JMS编码总体规范

技术分享图片 

 

二、创建Maven工程和引入Maven依赖

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.5</version>
</dependency>
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>4.10</version>
</dependency>

  

三、队列-生产者生产消息实例代码

public class JmsProducer {
    private static final String BROKER_URL = "tcp://192.168.229.129:61616";
    private static final String QUEUE_NAME = "queue01";
    private static final String TEXT_MESSAGE = "message";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2、通过连接工厂获取连接对象
        Connection connection = connectionFactory.createConnection();
        // 3、启动JMS连接
        connection.start();
        // 4、通过连接对象创建JMS Session对象,第一个参数为是否开启事务,第二个参数是消息的签收方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建Destination (Destination是 Queue/Topic的父接口)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息的生产者,参数是Destination,也就是消息的生产者会将消息推送到哪里.
        MessageProducer producer = session.createProducer(queue);
        // 7、创建消息,创建三个消息发送至队列
        for (int i = 1; i < 4; i++) {
            TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE + "0" + i);
            producer.send(textMessage);
        }
        // 8、释放资源,先开后关,后开先关
        producer.close();
        session.close();
        connection.close();
        System.out.println("消息成功发送至队列");
    }
}

  运行代码之后,登录ActiveMQ客户端,可以看到消息已经推送至队列queue01中了,由于我们发送了3个消息,所以这里的待处理消息就是3,我们还没有创建消费者,所以消费者数目为0,发送了3个消息,并且都进入了队列,所以入队的消息数目为3,没有消费者消费消息,那么出队列的消息数目为0.

技术分享图片

  队列相关名词解释:

名称 简介

Number Of Pending Messages

待处理的消息数目:入队的总数-出队的总数

Number Of Consumers

消费者数目:消费端的消费者数目

Messages Enqueued

已经入队的消息数目,进入队列的消息的数目,这个数目只增不减,即使出队了也不会减少

Messages Dequeued

已经出队的消息数目,也就是消费者消费掉的消息数目

 

四、队列-消费者消息实例代码

public class JmsConsumer {
    private static final String BROKER_URL = "tcp://192.168.229.129:61616";
    private static final String QUEUE_NAME = "queue01";
    private static final String TEXT_MESSAGE = "message";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂对象
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2、通过连接工厂获取连接对象
        Connection connection = connectionFactory.createConnection();
        // 3、启动JMS连接
        connection.start();
        // 4、通过connection对象获取JMS Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建Destination对象(Queue/Topic的父接口是Destination)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消费者对象
        MessageConsumer consumer = session.createConsumer(queue);
        // 7、消费者消费消息
        while (true) {
            // 时间为毫秒值,如果过了10s之后,消费者没有接收到新的消息,那么消费者自动和队列断开
            TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
            /*TextMessage textMessage = (TextMessage) consumer.receive();*/
            if (textMessage != null) {
                String text = textMessage.getText();
                if (text != null) {
                    System.out.println("接收到的消息是:" + text);
                }
            } else {
                break;
            }
        }
        // 8、释放资源
        consumer.close();
        session.close();
        connection.close();
    }
}

 

五、消费者几个细节

  1、注意:consumer.receive()方法有两个

// 一直接收消息,消费者和队列不会断开连接
TextMessage textMessage = (TextMessage) consumer.receive();
// 超过了时间(毫秒值),消费者将和队列自动断开连接
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);

  01、先看 consumer.receive(),当没有新消息的时候,消费者一直处于等待的状态,消费者数目为1

技术分享图片  

  02、consumer.receive(10*1000):10秒钟后,消费者将会和队列断开连接,消费者数目为0

技术分享图片

  2、消费者通过 consumer.setMessageListener(MessageListerer messageListener)的方式消费消息

// 消费者通过消息监听的方式消费消息
consumer.setMessageListener(new MessageListener() {
	@Override
	public void onMessage(Message message) {
		if (message != null && message instanceof TextMessage) {
			TextMessage textMessage = (TextMessage) message;
			try {
				System.out.println("接收到的消息是:" + textMessage.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
});
// 保证在连接到MQ之前,控制台不灭,也就是消费到了消息之后才去释放资源,Dubbo中也有
System.in.read();

  3、先启动生产者/消费者对消息的影响

生产者 消费者 结果
先启动生产者生产消息 然后只启动一号消费者 消费者可以正常消费消息
先启动生产者生产消息 然后启动一号消费者,接着启动二号消费者 一号消费者消费完所有的消息,二号消费者无法消费消息
然后启动生产者生产消息 先启动一号消费者,接着启动二号消费者 一号消费者,二号消费者都能消费消息,类似于负载均衡

 

 

 

Number Of Pending Messages

ActiveMQ_Java实现生产者和消费者操作队列

原文:https://www.cnblogs.com/xiaomaomao/p/13701064.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!