Maven配置
引一下jar包,这里还是用3.2.6这一比较经典的版本
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
生产者
写一个简单的Producer类,来发送消息:
/** * Producer,发送消息 */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("group_name"); producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { Message msg = new Message("TopicTest", // topic "TagA", // tag ("HelloWorld - RocketMQ" + i).getBytes() // body ); SendResult sendResult = producer.send(msg); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
消费者
写一个简单的Consumer类,来接收消息:
/** * Consumer,订阅消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg: msgs) { try { String topic = msg.getTopic(); String tags = msg.getTags(); String msgBody = new String(msg.getBody(),"utf-8"); System.out.println("收到消息--" + " topic:" + topic + " ,tags:" + tags + " ,msg:" +msgBody); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }
仔细看看生产者结果输出,就会发现,有的消息发往broker-a,有的在broker-b上,自动实现了消息的负载均衡!
这里消费消息是没有什么顺序的,以后我们在来谈消息的顺序性。
会发现消息分布在2个broker上。
2) 运行效果–先启动消费者,再启动生产者
3) 运行效果–先启动生产者(这样消息会有挤压),再启动消费者
我这儿整理了比较全面的JAVA相关的面试资料,
需要领取面试资料的同学,请加群:473984645
原文:https://www.cnblogs.com/1013wang/p/12198067.html