RocketMQ为4.3.0版本(我这种写法4.2.0不行)
如果你之前用的其他版本,需要去修改下系统的环境变量
maven工程用到的jar包
<dependencies> <!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>4.3.0</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency> </dependencies>
生产者代码
package cn.ebiz.rocketmq.transaction; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //01 new 一个有事务基因的生产者 TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup"); //02 注册 producer.setNamesrvAddr("127.0.0.1:9876"); //03 开启 producer.start(); /** * 04 生产者设置事务监听器,匿名内部类new一个事务监听器, * 重写“执行本地事务”和“检查本地事务”两个方法,返回值都为 * “本地事务状态” */ producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String tag = msg.getTags(); if(tag.equals("Transaction1")) { System.out.println("这里处理业务逻辑,比如操作数据库,失败情况下进行回滚"); //如果失败,再次给MQ发送消息 return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("state -- "+new String(msg.getBody())); return LocalTransactionState.COMMIT_MESSAGE; } }); for(int i=0;i<2;i++) { try { // 05 准备要发送的message,名字,标签,内容 Message msg = new Message("TopicTransaction","Transaction" + i,("Hello RocketMQ "+i).getBytes("UTF-8")); // 06 用发送事务特有的方法发送消息,而不是简单的producer.send(msg); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println(msg.getBody()); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } // 07 关闭 producer.shutdown(); } }
消费者代码
package cn.ebiz.rocketmq.transaction; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer0 { public static void main(String[] args) throws MQClientException { //01默认的消息消费FF者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup"); //02注册 consumer.setNamesrvAddr("127.0.0.1:9876"); //03设置获取原则 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //04订阅 consumer.subscribe("TopicTransaction","*"); //05注册监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { // 06接收消息并打印 for (MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(),"utf-8"); String tags = msg.getTags(); System.out.println("收到消息: topic:"+topic+" ,tags:"+tags+" ,msg: "+msgBody); } } catch (Exception e) { e.printStackTrace(); // 1s 2s 5s ... 2h return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 07开启 consumer.start(); System.out.println("Consumer Started."); } }
//namesrv启动
start mqnamesrv.cmd
//broker启动
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
---------------------
转自:https://blog.csdn.net/weixin_38537747/article/details/82112584
原文:https://www.cnblogs.com/itplay/p/10647619.html