<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.1</version> </dependency> </dependencies>
package com.gjh; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class SendInfo { public static void main(String[] args) { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); /** * 配置RabbitMQ的连接相关信息 * */ factory.setHost("localhost");//指定IP factory.setPort(5672);//指定端口 factory.setUsername("root");//指定账号 factory.setPassword("root");//指定密码 Connection connection = null;//定义连接 Channel channel = null;//定义通道 try { connection = factory.newConnection();//获取连接 channel = connection.createChannel();//获取通道 /** * 声明一个队列, * 参数 1 为 队列名取值任意 * 参数 2 为 是否为持久化的队列 * 参数 3 为是否排外 如果排外则这个队列只允许一个消费者监听 * 参数 4 是否自动删除对了,如果为true则表示当队列中没有消息,也没有消费者链接时就会自动删除这个队列 * 参数 5 为队列的一些属性设置通常为null即可 * 注意: * 1、声明队列时,这个队列名称如果已经存在则放弃声明,如果队列不存在则会声明一个新的队列 * 2、队列名可以取值任意,但是要与消息接收时完全一致 * 3、这行代码是可有可无的但是一定要在发送消息前确认队列名已经存在在RabbitMQ中,否则就会出现问题 */ channel.queueDeclare("myQueue",true,false,false,null); String message = "我的RabbitMQ的测试消息222233333333344444"; /** * 发送消息到MQ * 参数 1 为交换机名称 这里为空字符串表示不使用交换机 * 参数 2 为队列名或RoutingKey,当指定了交换机名称以后这个这个值就是RoutingKey * 参数 3 为消息属性信息 通常空即可 * 参数 4 为具体的消息数据的字节数组 * 注意:队列名必须要与接收时完全一致 */ channel.basicPublish("","myQueue",null,message.getBytes("utf-8")); System.out.println("消息发送成功:"+message); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if(channel!=null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if(connection!=null){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
package com.gjh; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveInfo { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");//指定IP factory.setPort(5672); factory.setUsername("root"); factory.setPassword("root"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("myQueue",true,false,false,null); /** * 接收消息 * 参数 1 为当前消费者需要监听的队列名 ,队列名必须要与发送时的队列名完全一致否则接收不到消息 * 参数 2 为消息是否自动确认,true表示自动确认接收完消息以后会自动将消息从队列中移除 * 参数 3 为消息接收者的标签,用于当多个消费者同时监听一个队列时用于确认不通消费者,通常为空字符串即可 * 参数 4 为消息接收的回调方法这个方法中具体完成对消息的处理代码 * 注意:使用了basicConsume方法以后,会启动一个线程在持续的监听队列,如果队列中有信息的数据进入则会自动接收消息 * 因此不能关闭连接和通道对象 */ channel.basicConsume("myQueue",true,"",new DefaultConsumer(channel){ //消息的具体接收和处理方法 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message=new String (body,"utf-8"); System.out.println("消费者-- "+message); } }); //不能关闭通道和链接,如果一旦关闭可能会造成接收时抛出异常,或无法接收到消息 // channel.close(); // connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
发送3条消息到队列中 运行三次SendInfo
接收到三条消息
原文:https://www.cnblogs.com/PJG20/p/14696917.html