Routing(路由)之订阅模型-Direct(直连)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。
但是,在某种场景下,我们希望不同的消息被不同的队列消费。
这是就要用到Direct类型的Exchange。
在Direct模型下:
图解:
生产者:
public class Provider { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); String exchange="logs_direct"; //通过通道声明交换机 参数1交换机名称 参数2交换机类型direct路由模式 channel.exchangeDeclare(exchange,"direct"); //发送消息 String routingkey="info"; channel.basicPublish(exchange,routingkey,null,("这是direct模型发布的基于routing key:["+routingkey+"]发送的消息").getBytes()); rabbitMQUtils.connectionAndchannelClose(connection,channel); } }
消费者1:
public class Customer1 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); String exchange="logs_direct"; //通道声明交换机以及交换机的类型 channel.exchangeDeclare(exchange,"direct"); //创建一个临时队列 String queue = channel.queueDeclare().getQueue(); //基于routing key绑定队列和交换机 channel.queueBind(queue,exchange,"error"); //获取消费信息 channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者-1"+new String(body)); } }); } }
消费者2:
public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = rabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); String exchange="logs_direct"; channel.exchangeDeclare(exchange,"direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,exchange,"info"); channel.queueBind(queue,exchange,"error"); channel.queueBind(queue,exchange,"warning"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("消费者-2"+new String(body)); } }); } }
原文:https://www.cnblogs.com/yz-bky/p/13056643.html