简单队列的缺点:消费者(Consumer)的消费能力有高有低,当生产者(producer)生产能力远远大于消费者能力,那么消息队列消息就会堆积,最终使消息队列达到上限。
引入工作队列,工作队列包括:轮询和公平两种方式。
轮询就是增加消费者,每个消费者依次读取消息。
package com.ckfuture.work.rr.send; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; /** * 工作队列-轮询-消息生产者 */ public class Send { //定义队列名称 private final static String QUEUE_NAME = "work_rr"; public static void main(String[] argv) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //连接工厂的地址 factory.setHost("localhost"); try ( //连接工厂创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 20; i++) { String message = "Hello World!"+i; //发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent ‘" + message + "‘"+i); } } } }
package com.ckfuture.work.rr.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * 工作队列-轮询-消息消费者 */ public class Recv01 { private final static String QUEUE_NAME = "work_rr"; public static void main(String[] argv) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //连接工厂创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费耗时 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); /** * 手动确认 * multiple:是否确认多条 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
package com.ckfuture.work.rr.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * 工作队列-轮询-消息消费者 */ public class Recv02 { private final static String QUEUE_NAME = "work_rr"; public static void main(String[] argv) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //连接工厂创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费耗时 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); /** * 手动确认 * multiple:是否确认多条 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
解决消费者之间消费能力差别的问题,按消费能力分配,能者多劳!!即:利用限流控制只有消费完成才允许获取。
限流代码:
//限制消费者每次只能接受一条消息,处理完才能接受下一条消息 int prefetchCount=1; channel.basicQos(prefetchCount);
package com.ckfuture.work.fair.send; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; /** * 工作队列-公平-消息生产者 */ public class Send { //定义队列名称 private final static String QUEUE_NAME = "work_fair"; public static void main(String[] argv) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //连接工厂的地址 factory.setHost("localhost"); try ( //连接工厂创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel()) { //绑定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 20; i++) { String message = "Hello World!"+i; //发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent ‘" + message + "‘"+i); } } } }
package com.ckfuture.work.fair.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * 工作队列-公平-消息消费者 */ public class Recv01 { private final static String QUEUE_NAME = "work_fair"; public static void main(String[] argv) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //连接工厂创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //限制消费者每次只能接受一条消息,处理完才能接受下一条消息 int prefetchCount=1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费耗时 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); /** * 手动确认 * multiple:是否确认多条 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
package com.ckfuture.work.fair.recv; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; /** * 工作队列-公平-消息消费者 */ public class Recv02 { private final static String QUEUE_NAME = "work_fair"; public static void main(String[] argv) throws Exception { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //连接工厂创建连接 Connection connection = factory.newConnection(); //创建信道 Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //限制消费者每次只能接受一条消息,处理完才能接受下一条消息 int prefetchCount=1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模拟消费耗时 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received ‘" + message + "‘"); /** * 手动确认 * multiple:是否确认多条 */ channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
Recv01消费能力强消费的多
Recv02消费能力弱
原文:https://www.cnblogs.com/ckfuture/p/14407168.html