package com.dwz.rabbitmq.util; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtils { /** * 获取MQ的连接 * @return * @throws TimeoutException * @throws IOException */ public static Connection getConnection() throws IOException, TimeoutException { //定义一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置服务地址 factory.setHost("127.0.0.1"); //设置AMQP端口号 factory.setPort(5672); //vhost factory.setVirtualHost("/vhost_dwz"); factory.setUsername("root_dwz"); factory.setPassword("123456"); //设置网络断开时的自动重连 factory.setAutomaticRecoveryEnabled(true); //每3秒重连一次 factory.setNetworkRecoveryInterval(3000); return factory.newConnection(); } }
package com.dwz.rabbitmq.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取一个连接 Connection connection = ConnectionUtils.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello rabbitmq simple message!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg--:" + msg); channel.close(); connection.close(); } }
注:此时未指定exchange,所以该队列绑定的是rabbitmq提供的默认的exchange
即:
此时的exchange类型是direct(直连)方式
package com.dwz.rabbitmq.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Receive { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取一个连接 Connection connection = ConnectionUtils.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //自动接收消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("consumer receive--:" + msg); } }; //监听队列 channel.basicConsume(QUEUE_NAME, consumer); } }
success!
原文:https://www.cnblogs.com/zheaven/p/11799385.html