首页 > 编程语言 > 详细

RabbitMQ--Java API

时间:2019-05-24 11:26:47      阅读:112      评论:0      收藏:0      [点我收藏+]

 

基于java使用RabbitMQ

框架:SpringBoot1.5.14.RELEASE

maven依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

本文只是操作原生RabbitMQ,并没有和SpringBoot进行整合,后面介绍整合,基于注解使用

一、quick start

1.1、Consumer

技术分享图片
public static void main(String[] args) throws Exception{
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);
        //2 通过连接工厂创建连接
        Connection connection = factory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();

        //4 声明(创建)一个队列
        channel.queueDeclare("test002", true, false, false, null);

        //5 创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //6 设置Channel
        channel.basicConsume("test002", true, queueingConsumer);

        while(true){
            //7 获取消息
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            log.info(msg);
        }
    }
}
Consumer

参数解释:

  durable:是否持久化,Durable:是,即使服务器重启,这个队列也不会消失,Transient:否

  exclusive:这个queue只能由一个exchange监听restricted to this connection,使用场景:顺序消费

  autoDelete:当最后一个Binding到Exchange的Queue删除之后,自动删除该Exchange

  arguments:参数

  autoACK:是否自动签收,对应着手动签收

1.2、Producer

技术分享图片
public class Producer {

    public static void main(String[] args) throws Exception{
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("139.196.75.238");
        factory.setPort(5672);
        //2 通过连接工厂创建连接
        Connection connection = factory.newConnection();
        //3 通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4 通过Channel发送数据
        for(int i=0; i < 5; i++){
            String msg = "Hello RabbitMQ!";
            //1 exchange   2 routingKey
            channel.basicPublish("", "test002", null, msg.getBytes());
        }
        //5 关闭相关的连接
        channel.close();
        factory.clone();
    }
}
Producer

参数解释:

  exchange name:

  routingKey:路由规则

  BasicProperties:

  body:message中的body

 

结果:

17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!
17:43:49.351 [main] INFO com.it.quickstart.Consumer - Hello RabbitMQ!

 

  我们使用RabbitMQ,需要首先在可视化界面确定queue,exchange是否创建,对应关系是否正常,这是一个大前提

技术分享图片

 

1.3、自定义消费者

  之前接收message,通过while(true),感觉太low了,RabbitMQ支持实现自定义消费者,只需要集成DefaultConsumer,重写handlerDelivery,

构造器

技术分享图片
public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}
MyConsumer

 

而consumer只需要修改

//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);            //去掉这一步
//6 设置Channel channel.basicConsume("test002", true, new MyConsumer(channel));

 

结果:

-----------consume message----------
consumerTag: amq.ctag-YK7CnvWxTpm6hmuyUyqSkQ
envelope: Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=test002)
properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body: Hello RabbitMQ By MyConsumer!

 

 

二、Exchange

  Exchange有四种方式FanOut、Direct、Topic、Headers,而我们上面的例子,并没有定义Exchange,RabbitMQ默认使用AMQP default,

要求routing key和queue name相同

技术分享图片

 

RabbitMQ--Java API

原文:https://www.cnblogs.com/huigelaile/p/10907523.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!