首页 > 其他 > 详细

RabbitMQ-2 常见的交换器

时间:2020-07-12 22:46:25      阅读:64      评论:0      收藏:0      [点我收藏+]

1、简介

前面第一章我们已经初步了解了什么是Exchange,简单来说它主要目的是为了接收消息,并根据路由键转发到所绑定的队列Queue,下面我用一张图来解释

技术分享图片

1、首先Send Massage 作为生产者 投递消息至Exchange;
2、Exchange 根据黄色区域 RoutingKey 对应将消息路由到Queue;
3、Receive Message 作为消费者,它会和Queue建立一个监听,然后接收消息

 

2 构建Exchange

我们通过channel 的 exchangeDeclare 方法进行,参数如下

技术分享图片View Code

各个参数详细说明

exchange 交换器的名称
type 交换器的类型,常见的有fanout、direct、topic、headers这四种
durable 设置是否持久 durab 设置为 true 表示持久化, 反之是非持久,设置为true则将Exchange存盘,即使服务器重启数据也不会丢失
autoDelete 设置是否自动删除,当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange,简单来说也就是如果该Exchange没有和任何队列Queue绑定则删除
internal 设置是否是RabbitMQ内部使用,默认false。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument 扩展参数,用于扩展AMQP协议自制定化使用

 

3 Exchange的各种TYPE (各种交换器)

1 Direct Exchange 

技术分享图片

该类型的交换器路由规则也很简单,它会把消息路由到那些BindingKey 和 RoutingKey 完全匹配的队列中。

生产者

技术分享图片
public class DirectExchangeProducer {
    
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_direct_exchange";
        //可以修改routingKey的值使其和消费端的bingingKey不一致来测试 直连的方式
        String routingKey = "test.direct";
        //5 发送
        String msg = "Test Direct Exchange Message";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
    
}
View Code

消费者

技术分享图片
public class DirectExchangeConsumer {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        //定义为直连
        String exchangeType = "direct";
        //定义队列名
        String queueName = "test_direct_queue";
        //设置routingKey 也可以说是 bindingKey 很多时候我们可以理解成同一个东西
        String routingKey = "test.direct";
        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //5 创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费端:" + msg);
            }
        };
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
    }
    
}
View Code

 

 

2   Fanous Exchange 

技术分享图片

 它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,忽略routing key  类似ActiveMQ的Topic广播模式

生产者

技术分享图片
public class FanoutExchangeProducer {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明交换器的名字
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for(int i = 0; i < 10; i ++) {
            String msg = "Test Fanout Exchange Message";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());             
        }
        channel.close();  
        connection.close();  
    }
    
}
View Code

消费者

技术分享图片
public class FanoutExchangeConsumer {

    public static void main(String[] args) throws Exception {
        //1 创建一个ConnectionFactory, 并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 定义
        String exchangeName = "test_fanout_exchange";
        //5 指定类型为fanout
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        //不设置路由键,没有路由情况下也能收到证明并不处理任何的路由键
        String routingKey = "";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //5 创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费端:" + msg);
            }
        };
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true,
View Code

上述代码 channel.queueBind(queueName, exchangeName, routingKey) 中routingKey为空但是依然可以把消息路由到队列,说明Fanout类型并不处理路由键;

运行效果:
技术分享图片

 

3 Topic 

技术分享图片

         前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。topic类型的交换器在匹配规则上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队 列中,但这里的匹配规则有些不同,它约定:

BindingKey 中可以存在两种特殊的字符串 "*" 和 "#" , 用于模糊匹配,其中

"#"用于匹配一个单词, "*"用于匹配多个单个(可以是零个)。

com.itcast.client ,   com.itheima.exam   

如 com.#.exam  就匹配了  com.itheima.exam   

如 com.* 就匹配了 com.itcast.client 和 com.itheima.exam   

 

生产者

技术分享图片
public class TopicExchangeProducer {
    
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();  
        //4 声明
        String exchangeName = "test_topic_exchange";
        //定义三个routingKey 其中第三个我们再消费者中故意匹配不成功
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.one";
        //5 发送
        String msg = "Test Topic Exchange Message";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());     
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes()); 
        channel.close();  
        connection.close();  
    }
    
}
View Code

消费者

技术分享图片
public class TopicExchangeConsumer {

    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.1.28");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("toher");
        connectionFactory.setPassword("toher888");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        //指定类型为topic
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        //因为*号代表匹配一个单词,生产者中routingKey3将匹配不到
        String routingKey = "user.*";
        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //5 创建消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费端:" + msg);
            }
        };
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
    }
}
View Code

 

技术分享图片

 

 

 

4  headers :

    该类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中 的 headers 属性进行匹配。headers 不是很常用这里就不多介绍了

 

RabbitMQ-2 常见的交换器

原文:https://www.cnblogs.com/hup666/p/13290329.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!