添加jar包依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
//获取链接
public static Connection newConnection() {
ConnectionFactory cFactory = new ConnectionFactory();
cFactory.setHost("127.0.0.1");
cFactory.setVirtualHost("/rdf");
cFactory.setPort(5672);
cFactory.setUsername("ruidongfei");
cFactory.setPassword("ruidongfei");
try {
return cFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
生产者发送消息
/**
* @param args
* @throws TimeoutException
* @throws IOException
* @throws InterruptedException
*/
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.newConnection();
final Channel channel = connection.createChannel();
//声明交换器 存在的交换机不用声明 ,可以直接使用
//String exchangeName = "RexchangeX";
String exchangeName = "Rexchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
// String queue = channel.queueDeclare("testqueue", true, false, false, null).getQueue();
//String queueName = channel.queueDeclare().getQueue(); //定义临时队列名,程序关闭时删除对列
//路由键值 ,用来绑定对列和交换机
String routingKey = "testqueue";
BasicProperties bp = new BasicProperties();
Boolean mandatory = true;//匹配不到队列返回给生产者 addReturnListener
Boolean immediate = false;
for(int i = 0;i<50;i++) {
channel.basicPublish(exchangeName, routingKey, bp, ("provider send{"+i+"} testing").getBytes());
}
channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_BASIC, "provider send{2} testing".getBytes());
channel.basicPublish(exchangeName, routingKey,mandatory,immediate, bp, "provider send{3} testing".getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body)
throws IOException {
System.out.println(exchange+" "+routingKey);
System.out.println(new String(body));
}
});
channel.close();
connection.close();
}
//消费者处理消息
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.newConnection();
final Channel channel = connection.createChannel();
//声明交换器 存在的交换机不用声明 ,可以直接使用
String exchangeName = "Rexchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queue = channel.queueDeclare("testqueue", true, false, false, null).getQueue();
//String queueName = channel.queueDeclare().getQueue(); //定义临时队列名,程序关闭时删除对列
//路由键值 ,用来绑定对列和交换机
String routingKey = "testqueue";
//绑定队列,使用路由键值绑定路由器和对列
channel.queueBind(queue, exchangeName, routingKey);
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queue, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
rabbitmq 简单应用
原文:https://www.cnblogs.com/CoderRdf/p/rabbitmq.html