学习RocketMQ,先写一个Demo演示一下看看效果。
因为只是简单的为了演示效果,服务端仅部署单Master模式 —— 一个Name Server节点,一个Broker节点。主要有一下过程。
unzip rocketmq-all-4.4.0-source-release.zip cd rocketmq-all-4.4.0/ mvn -Prelease-all -DskipTests clean install –U
cd distribution/target/apache-rocketmq
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
Name Server和Broker都成功启动,服务器就部署完成了。更详细的参考官方文档手册,里面还包含在服务器上运行Producer、Customer示例,这里主要是在项目中使用。
官网手册戳这里:Quick Start
客户端分为消息生产者和消息消费者,这里通过日志打印输出查看效果,为了看起来更清晰,我新建了两个模块分别作为消息生产者和消息消费者。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
# RocketMQ生产者 rocketmq: producer: # Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER producerGroup: ${spring.application.name} # namesrv地址 namesrvAddr: 192.168.101.213:9876 # 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB maxMessageSize: 4096 # 发送消息超时时间,单位毫秒。默认10000 sendMsgTimeout: 5000 # 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2 retryTimesWhenSendFailed: 2 # 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096 compressMsgBodyOverHowmuch: 4096 # 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 createTopicKey: XIAO_LIU
@Configuration public class ProducerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerConfiguration.class); /** * Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。默认DEFAULT_PRODUCER */ @Value("${rocketmq.producer.producerGroup}") private String producerGroup; /** * namesrv地址 */ @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; /** * 客户端限制的消息大小,超过报错,同时服务端也会限制,需要跟服务端配合使用。默认4MB */ @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize; /** * 发送消息超时时间,单位毫秒。默认10000 */ @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; /** * 如果消息发送失败,最大重试次数,该参数只对同步发送模式起作用。默认2 */ @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; /** * 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节。默认4096 */ @Value("${rocketmq.producer.compressMsgBodyOverHowmuch}") private Integer compressMsgBodyOverHowmuch; /** * 在发送消息时,自动创建服务器不存在的topic,需要指定Key,该Key可用于配置发送消息所在topic的默认路由。 */ @Value("${rocketmq.producer.createTopicKey}") private String createTopicKey; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(this.producerGroup); producer.setNamesrvAddr(this.namesrvAddr); producer.setCreateTopicKey(this.createTopicKey); if (this.maxMessageSize != null) { producer.setMaxMessageSize(this.maxMessageSize); } if (this.sendMsgTimeout != null) { producer.setSendMsgTimeout(this.sendMsgTimeout); } if (this.retryTimesWhenSendFailed != null) { producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } if (this.compressMsgBodyOverHowmuch != null) { producer.setCompressMsgBodyOverHowmuch(this.compressMsgBodyOverHowmuch); } if (Strings.isNotBlank(this.createTopicKey)) { producer.setCreateTopicKey(this.createTopicKey); } try { producer.start(); LOGGER.info("Producer Started : producerGroup:[{}], namesrvAddr:[{}]" , this.producerGroup, this.namesrvAddr); } catch (MQClientException e) { LOGGER.error("Producer Start Failed : {}", e.getMessage(), e); } return producer; } }
@RunWith(SpringRunner.class) @SpringBootTest public class ProducerServiceApplicationTests { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerServiceApplicationTests.class); @Autowired private DefaultMQProducer defaultMQProducer; @Test public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException { for (int i = 0; i < 100; i++) { User user = new User(); user.setUsername("用户" + i); user.setPassword("密码" + i); user.setSex(i % 2); user.setBirthday(new Date()); Message message = new Message("user-topic", "user-tag", JSON.toJSONString(user).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = defaultMQProducer.send(message); LOGGER.info(sendResult.toString()); } } }
# RocketMQ消费者 rocketmq: consumer: # Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组。默认DEFAULT_CONSUMER consumerGroup: ${spring.application.name} # namesrv地址 namesrvAddr: 192.168.101.213:9876 # 消费线程池最大线程数。默认10 consumeThreadMin: 10 # 消费线程池最大线程数。默认20 consumeThreadMax: 20 # 批量消费,一次消费多少条消息。默认1 consumeMessageBatchMaxSize: 1 # 批量拉消息,一次最多拉多少条。默认32 pullBatchSize: 32 # 订阅的主题 topics: user-topic
@Configuration public class ConsumerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerConfiguration.class); @Value("${rocketmq.consumer.consumerGroup}") private String consumerGroup; @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Value("${rocketmq.consumer.pullBatchSize}") private int pullBatchSize; @Value("${rocketmq.consumer.topics}") private String topics; private final ConsumeMsgListener consumeMsgListener; @Autowired public ConsumerConfiguration(final ConsumeMsgListener consumeMsgListener) { this.consumeMsgListener = consumeMsgListener; } @Bean public DefaultMQPushConsumer getRocketMQConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); consumer.setPullBatchSize(pullBatchSize); consumer.registerMessageListener(consumeMsgListener); try { /** * 设置消费者订阅的主题和tag。subExpression参数为*表示订阅该主题下所有tag, * 如果需要订阅该主题下的指定tag,subExpression设置为对应tag名称,多个tag以||分割,例如"tag1 || tag2 || tag3" */ consumer.subscribe(topics, "*"); consumer.start(); LOGGER.info("Consumer Started : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr); } catch (Exception e) { LOGGER.error("Consumer Start Failed : consumerGroup:{}, topics:{}, namesrvAddr:{}", consumerGroup, topics, namesrvAddr, e); e.printStackTrace(); } return consumer; } }
新增消息监听器,监听到新消息后,执行对应的业务逻辑。
@Component public class ConsumeMsgListener implements MessageListenerConcurrently { private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeMsgListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)) { LOGGER.info("Msgs is Empty."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } for (MessageExt msg : msgs) { try { if ("user-topic".equals(msg.getTopic())) { LOGGER.info("{} Receive New Messages: {}", Thread.currentThread().getName(), new String(msg.getBody())); // do something } } catch (Exception e) { if (msg.getReconsumeTimes() == 3) { // 超过3次不再重试 LOGGER.error("Msg Consume Failed."); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { // 重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
Demo很简单,但是里面还有很多东西需要慢慢研究。
代码可以戳这里:spring-cloud-learn
原文:https://www.cnblogs.com/Mr-XiaoLiu/p/10620171.html