package cn.jack.rabbitmq.ps;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME="test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();
//声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes());
System.out.println("[product] Send ‘"+ message +"‘");
//关闭通道和连接
channel.close();
connection.close();
}
}
与之前不同的是,之前的生产者是声明(创建)队列:
//声明(创建)队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 而该生产者不再声明队列,即不再与队列直接连接,而是换为声明交换机: channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
我们这里先不编写消费者,直接运行一下生产者:
package cn.jack.rabbitmq.ps;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_ps_1";//队列名称
private final static String EXCHANGE_NAME="test_exchange_fanout";//交换机名称
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [consumer1] Received ‘" + message + "‘");
//休眠10ms
Thread.sleep(10);
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
这里我们为消费者声明了一个队列:
// 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
然后将该队列绑定到之前生产者绑定的交换机上:
//绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
这样就可以获取生产者通过交换机推送的信息了。
private final static String QUEUE_NAME = "test_queue_ps_2";//队列名称
获取消息后的控制台打印记得更换名称为“consumer2”:System.out.println(" [consumer2] Received ‘" + message + "‘");
此时我们同时启动生产者,以及两个消费者,可以想看到两个消费者同时获取了生产者发送的全部信息:原文:https://www.cnblogs.com/laosunlaiye/p/11671459.html