package cn.jack.rabbitmq.ps; import cn.jack.rabbitmq.connection.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME="test_exchange_fanout"; public static void main(String[] args) throws Exception { //获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //从连接中创建通道 Channel channel = connection.createChannel(); //声明Exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //消息内容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes()); System.out.println("[product] Send ‘"+ message +"‘"); //关闭通道和连接 channel.close(); connection.close(); } }
与之前不同的是,之前的生产者是声明(创建)队列:
//声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 而该生产者不再声明队列,即不再与队列直接连接,而是换为声明交换机: channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
我们这里先不编写消费者,直接运行一下生产者:
package cn.jack.rabbitmq.ps; import cn.jack.rabbitmq.connection.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv1 { private final static String QUEUE_NAME = "test_queue_ps_1";//队列名称 private final static String EXCHANGE_NAME="test_exchange_fanout";//交换机名称 public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [consumer1] Received ‘" + message + "‘"); //休眠10ms Thread.sleep(10); // 返回确认状态 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
这里我们为消费者声明了一个队列:
// 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
然后将该队列绑定到之前生产者绑定的交换机上:
//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
这样就可以获取生产者通过交换机推送的信息了。
private final static String QUEUE_NAME = "test_queue_ps_2";//队列名称
获取消息后的控制台打印记得更换名称为“consumer2”:System.out.println(" [consumer2] Received ‘" + message + "‘");
此时我们同时启动生产者,以及两个消费者,可以想看到两个消费者同时获取了生产者发送的全部信息:原文:https://www.cnblogs.com/laosunlaiye/p/11671459.html