生产者 Producer 重试(异步和 SendOneWay下配置无效)
消费端重试
PayController 类代码如下:
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"6688" , ("hello xdclass rocketmq = "+text).getBytes() ); SendResult sendResult = payProducer.getProducer().send(message); System.out.println(sendResult); return new HashMap<>(); } }
JmsConfig 类代码如下:
package net.xdclass.xdclassmq.jms; public class JmsConfig { public static final String NAME_SERVER = "192.168.159.129:9876;192.168.159.130:9876"; public static final String TOPIC = "xdclass_pay_test_topic_666"; }
PayProducer 类代码如下:
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class PayProducer { private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; public PayProducer(){ producer = new DefaultMQProducer(producerGroup); //生产者投递消息重试次数 producer.setRetryTimesWhenSendFailed(3); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } public DefaultMQProducer getProducer(){ return this.producer; } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown(){ this.producer.shutdown(); } }
PayConsumer 类代码如下:
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.TOPIC, "*"); consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); int times = msg.getReconsumeTimes(); System.out.println("重试次数="+times); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); // 模拟报错 if(keys.equalsIgnoreCase("6688")){ throw new Exception(); } System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费异常"); //如果重试2次不成功,则记录,人工介入 if(times >= 2){ System.out.println("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员"); //TODO 记录数据库,发短信通知开发人员或者运营人员 //告诉broker,消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
官方文档:https://rocketmq.apache.org/docs/simple-example/
@RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() ); payProducer.getProducer().send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString()); } @Override public void onException(Throwable e) { e.printStackTrace(); //补偿机制,根据业务情况进行使用,看是否进行重试 } }); return new HashMap<>(); }
注意:官方例子:如果异步发送消息,调用 producer.shutdown() 后会失败。异步发送:不会重试,发送总次数等于1。
@RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() ); payProducer.getProducer().sendOneway(message); return new HashMap<>(); }
汇总对比
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
什么是延迟消息?
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即延迟消息,目前支持固定延迟精度的消息。
固定精度为:"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
使用 message.setDelayTimeLevel(xxx); xxx是级别,1表示配置里面的第一个级别,2表示第二个级别。
@RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() ); message.setDelayTimeLevel(2);// 表示5s后进行投递 payProducer.getProducer().sendOneway(message); return new HashMap<>(); }
使用场景
RocketMQ 还有定时消息功能,目前开源版本还不支持,商业版本则有,两者使用场景类似。
应用场景:顺序消息,分摊负载。
默认 Topic 下的 queue 数量是4,可以配置。
import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.List; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() ); //同步发送 // SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { // @Override // public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // int queueNum = Integer.parseInt(arg.toString()); // return mqs.get(queueNum); // } // // }, 3); // System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString()); //异步发送到指定queue, SendCallback不能用lambda表达式,有两个函数需要被实现 payProducer.getProducer().send(message, (mqs, msg, arg) -> { int queueNum = Integer.parseInt(arg.toString()); return mqs.get(queueNum); }, 0, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); return new HashMap<>(); } }
支持同步,异步发送指定的 MessageQueue。
注意:选择的 queue 数量必须小于配置的,否则会出错。
生产端保证发送消息有序,且发送到同一个 Topic 的同个 queue 里面,RocketMQ 的确是能保证 FIFO 的。
例子:订单的顺序流程是:创建、付款、物流、完成,订单号相同的消息会被先后发送到同一个队列中,根据 MessageQueueSelector 里面自定义策略,根据同个业务 id 放置到同个 queue 里面,如订单号取模运算再放到 selector 中,同一个模的值都会投递到同一条 queue。
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //如果是订单号是字符串,则进行hash,得到一个hash值 Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int)index); }
消费端要在保证消费同个 Topic 里的同个队列,不应该用 MessageListenerConcurrently,应该使用 MessageListenerOrderly,自带单线程消费消息,不能再 Consumer 端再使用多线程去消费,消费端分配到的 queue 数量是固定的,集群消会锁住当前正在消费的队列集合的消息,所以会保证顺序消费。
下面代码来演示生产者投递消息和消费者消费消息,项目目录如下:
ProductOrder 类
package net.xdclass.xdclassmq.domain; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProductOrder implements Serializable { //订单id private long orderId; //操作类型 private String type; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getType() { return type; } public void setType(String type) { this.type = type; } public ProductOrder() { } public ProductOrder(long orderId, String type) { this.orderId = orderId; this.type = type; } public static List<ProductOrder> getOrderList() { List<ProductOrder> list = new ArrayList<>(); list.add(new ProductOrder(111L, "创建订单")); list.add(new ProductOrder(222L, "创建订单")); list.add(new ProductOrder(111L, "支付订单")); list.add(new ProductOrder(222L, "支付订单")); list.add(new ProductOrder(111L, "完成订单")); list.add(new ProductOrder(333L, "创建订单")); list.add(new ProductOrder(222L, "完成订单")); list.add(new ProductOrder(333L, "支付订单")); list.add(new ProductOrder(333L, "完成订单")); return list; } @Override public String toString() { return "ProductOrder{" + "orderId=" + orderId + ", type=‘" + type + ‘\‘‘ + ‘}‘; } }
PayController 类
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.domain.ProductOrder; import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.List; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v2/pay_cb") public Object callback() throws Exception { List<ProductOrder> list = ProductOrder.getOrderList(); for (int i = 0; i < list.size(); i++) { ProductOrder order = list.get(i); Message message = new Message(JmsConfig.ORDERLY_TOPIC, "", order.getOrderId() + "", order.toString().getBytes()); SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int) index); } }, order.getOrderId()); System.out.printf("发送结果=%s, sendResult=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.toString(), order.getOrderId(), order.getType()); } return new HashMap<>(); } }
JmsConfig 类
package net.xdclass.xdclassmq.jms; public class JmsConfig { public static final String NAME_SERVER = "192.168.159.129:9876;192.168.159.130:9876"; public static final String TOPIC = "xdclass_pay_test_topic_888"; public static final String ORDERLY_TOPIC = "xdclass_pay_order_topic_orderly"; }
PayConsumer 类
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.util.List; @Component public class PayConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_consumer_group"; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.TOPIC, "*"); consumer.registerMessageListener( new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); int times = msg.getReconsumeTimes(); System.out.println("重试次数="+times); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { System.out.println("消费异常"); //如果重试2次不成功,则记录,人工介入 if(times >= 2){ System.out.println("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员"); //TODO 记录数据库,发短信通知开发人员或者运营人员 //告诉broker,消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
PayOrderlyConsumer 类
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.stereotype.Component; import java.util.List; @Component public class PayOrderlyConsumer { private DefaultMQPushConsumer consumer; private String consumerGroup = "pay_orderly_consumer_group"; public PayOrderlyConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(JmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //默认是集群方式,可以更改为广播,但是广播方式不支持重试 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(JmsConfig.ORDERLY_TOPIC, "*"); consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { MessageExt msg = msgs.get(0); try { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); //做业务逻辑操作 TODO return ConsumeOrderlyStatus.SUCCESS; } catch (Exception e) { e.printStackTrace(); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } }); consumer.start(); System.out.println("consumer start ..."); } }
PayProducer 类
package net.xdclass.xdclassmq.jms; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class PayProducer { private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; public PayProducer(){ producer = new DefaultMQProducer(producerGroup); //生产者投递消息重试次数 producer.setRetryTimesWhenSendFailed(3); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } public DefaultMQProducer getProducer(){ return this.producer; } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown(){ this.producer.shutdown(); } }
原文:https://www.cnblogs.com/jwen1994/p/12353909.html