在介绍了rabbitmq的Publish/subscrige模式之后,这一节我们将阐述它的Routing模式的用法。
package net.wanho.rabbitmq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] argv) throws Exception {
String queue_inform_email = "queue_inform_email";
String queue_inform_sms = "queue_inform_sms";
String exchange_routing_inform = "exchange_routing_inform";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//创建一个连接
Connection connection = factory.newConnection();
//创建与交换机的通道,每个通道代表一个会话
Channel channel = connection.createChannel();
/**
声明交换机 String exchange, BuiltinExchangeType type
参数明细
1、交换机名称
2、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(exchange_routing_inform, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queue_inform_email, true, false, false, null);
channel.queueDeclare(queue_inform_sms, true, false, false, null);
channel.queueBind(queue_inform_email, exchange_routing_inform, queue_inform_email);
channel.queueBind(queue_inform_sms, exchange_routing_inform, queue_inform_sms);
for (int i = 0; i < 10; i++) {
String message = "email inform to user" + i;
channel.basicPublish(exchange_routing_inform, queue_inform_email, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
for (int i = 0; i < 10; i++) {
String message = "sms inform to user" + i;
channel.basicPublish(exchange_routing_inform, queue_inform_sms, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
}
}
package net.wanho.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
String QUEUE_INFORM_EMAIL = "queue_inform_email";
String EXCHANGE_ROUTING_INFORM = "inform_exchange_routing";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String message = new String(body, "utf-8");
System.out.println(message);
}
};
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
package net.wanho.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
String QUEUE_INFORM_SMS = "queue_inform_sms";
String EXCHANGE_ROUTING_INFORM = "inform_exchange_routing";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String message = new String(body, "utf-8");
System.out.println(message);
}
};
channel.basicConsume(QUEUE_INFORM_SMS, true, defaultConsumer);
}
}
原文:https://www.cnblogs.com/alichengxuyuan/p/12513558.html