声明交换机、队列:
import com.rabbitmq.client.*; import com.rabbitmq.client.impl.nio.NioParams; import java.io.IOException; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class Rmq { private static final String EXCHANGE_NAME = "ex1"; private static final String QUEUE_NAME = "queue1"; private static final String ROUTING_KEY = "key1"; public static void main(String[] args) throws IOException,TimeoutException,InterruptedException { getChannel(); } public static void getChannel() throws IOException,TimeoutException,InterruptedException { ConnectionFactory factory = new ConnectionFactory(); //初始化 factory.setHost("120.77.37.94"); factory.setNioParams(new NioParams().setNbIoThreads(2)); factory.useNio(); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123456"); //Connection Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); /** * 直连交换机(direct exchange): (empty string) and amq.direct/名字绑定到同名的队列 * 默认交换机(direct exchange): empty string/名字为""的直连交换机 * 扇型交换机(fanout exchange): amq.fanout/交换机会将消息的拷贝分别发送给这所有的N个队列 * 主题交换机(topic exchange): amq.topic/路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。 * 头交换机(headers exchange): amq.match/headers头 **/ //声明交换机 /** * String exchange, //交换机名字 * BuiltinExchangeType type, //交换机类型枚举:DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers"); * boolean durable, //是否持久化,重启可恢复(保存到Erlang自带的Mnesia数据库中) * boolean autoDelete, //当已经没有消费者时,服务器是否可以删除该Exchange * boolean internal, //是否内置,如果设置 为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器的方式 * Map<String, Object> arguments //Map<String, Object> 类型参数 */ channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT,false,false,false,null); //声明一个队列 /** * String queue, //队列名字 * boolean durable, //是否持久化,重启可用(保存到Erlang自带的Mnesia数据库中) * boolean exclusive, //连接关闭自动删除,只能一个队列访问(当前队列加锁),多个队列强制访问报reply-code=405异常 * boolean autoDelete, //无消费者时是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management, * 查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 * 队列中的消息什么时候会自动被删除? * *Map<String, Object> arguments //队列声明后无法修改此参数 * *Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, * 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”); * channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”)); * Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp * Max Length(x-max-length): 限定队列的消息的最大值长度n,保留最新的n条,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim * Max Length Bytes(x-max-length-bytes)(1024): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B * Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX * Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK * Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费, * Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中 * Master locator(x-queue-master-locator) */ Map<String, Object> queueParams = new HashMap<String, Object>(); //queueParams.put("x-max-length",10); //最大长度 //queueParams.put("x-max-priority",10); //优先级队列 //queueParams.put("x-dead-letter-exchange","ex2"); //queueParams.put("x-dead-letter-routing-key","key2"); //channel.queueDeclare(QUEUE_NAME, false, false, false, queueParams); //channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY,null); //channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#",null); //主题模式 //channel.exchangeBind("ex2","ex1","key1",null);//交换机和交换机绑定 AMQP.Queue.DeclareOk response =channel.queueDeclarePassive(QUEUE_NAME); System.out.println("队列名字:" + response.getQueue()); System.out.println("消费者数量:" + response.getConsumerCount()); System.out.println("消息数量:" + response.getMessageCount()); System.out.println("方法名字:" + response.protocolMethodName()); /** https://blog.csdn.net/vbirdbest/article/details/78670550 * String exchange, //交换机名字 * String routingKey, * 消息的路由,路由到交换机所绑定的队列上,根据队列绑定路由key将消息推送到此队列 * 广播模式无routingKey此概念 * #匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用。 * boolean mandatory(强制性的), * true:如果exchange根据自身类型和消息routeKey无法匹配符合条件的queue,那么会调用basic.return方法将消息返还给生产者。 * false:出现上述情形broker会直接将消息扔掉 * boolean immediate(立刻的), * true:routingKey对应队列无消费者时,消息会通过basic.return方法返还给生产者。 * BasicProperties props, //单条消息的策略 * byte[] body //消息byte数组 */ //*注: 消息持久化时必须将消息持久化、队列持久化、交换机持久化 //BasicProperties 可读 AMQP.BasicProperties properties1 = MessageProperties.MINIMAL_BASIC;//(1)持久化 //Builder 可写 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder(); /*//确认机制 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { //成功处理 public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(deliveryTag+" ack "+multiple); } //rabbitmq服务器内部异常未成功处理 public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println(deliveryTag+" nack "+multiple); } });*/ //channel.txSelect();//开启事务 for (int i=1;i<=5;i++) { properties .deliveryMode(1) //(2)设置消息是否持久化,1:非持久化 2:持久化 .priority(i) //(优先级)指定消息的优先级p,在队列优先级1-10范围内p越大越优先 //.headers(new HashMap<String, Object>(){{put("aaa","aaa");}}) .messageId("aa"+i) //.expiration(1000*5+"") //单条消息失效时间,如果队列设置了TTL,消息也设置了,那么会取小的 ; if (i == 4) { throw new RuntimeException("测试异常4"); } /** * 延时队列(订单到期触发业务) * 1、队列queue设置主题TTL失效时间 * 2、单个消息msg设置expiration失效时间 * 选择.如果列队数多个可能导致消息延时时间不一样,则选择单个消息设置 * 设置:x-dead-letter-exchange、x-dead-letter-routing-key 将过期或者队列达到长度队首溢出的消息路由到死信队列 * */ channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,false,false,properties.build(),("你SSSSDA好"+i).getBytes("utf-8")); } //channel.txCommit();//提交事务,出现异常自动回滚 //channel.txRollback();//回滚事务 //channel.queuePurge(QUEUE_NAME);//清空队列 //channel.exchangeDelete(EXCHANGE_NAME,false);//删除交换机 //channel.queueDelete(QUEUE_NAME,true,false);//删除队列 channel.close(); connection.close(); } }
消费者消费:
import com.rabbitmq.client.*; import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeoutException; public class Customer { private static final String EXCHANGE_NAME = "ex1"; private static final String QUEUE_NAME = "queue1"; private static final String ROUTING_KEY = "key1"; public static void main(String[] args) throws IOException,TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //初始化 factory.setHost("120.77.37.94"); factory.setPort(5672); factory.setUsername("root"); factory.setPassword("123456"); //Connection Connection connection = factory.newConnection(); //创建一个通道 final Channel channel = connection.createChannel(); //channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT,false,false,false,null); //channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY,null); //声明一个队列 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //properties 获取单条消息的策略设置值 //body 消息数组 Map<String,Object> headers = properties.getHeaders(); System.out.println("Customer consumerTag:" + consumerTag);//消费者唯一标识 System.out.println("Customer envelope:" + envelope.toString());//deliveryTag=10(此消费者每次接收消息+1), redeliver=false, exchange=ex1, routingKey=key1 //System.out.println("Customer properties:" + properties.getHeaders());//消息头自定义参数可以记录重试次数 System.out.println("Customer msg:" + new String(body,"utf-8")); //channel.basicReject(envelope.getDeliveryTag(),false);//拒绝处理,true 重新放入队列 false抛弃消息(进入后续队列) //channel.basicRecover(true);//消息重入队列,requeue=true,发送给新的consumer,false发送给相同的consumer //channel.basicNack(envelope.getDeliveryTag(),false,false);//(类似reject) requeue: true重新放入队列 false抛弃消息(进入后续队列) channel.basicAck(envelope.getDeliveryTag(),false); //消息应答确认已被消费,若无返回重新把消息加入队列 } };//channel.basicGet() channel.basicConsume(QUEUE_NAME, false, consumer);//是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答 } }
原文:https://www.cnblogs.com/qinggg/p/12924911.html