RocketMQ提供了Java客户端,用于发送和接收消息。
发送消息有三种方式:同步、异步和单向。
1、同步发送消息
//Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "Hello World! ".getBytes(RemotingHelper.DEFAULT_CHARSET)); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.println(sendResult); //Shut down once the producer instance is not longer in use. producer.shutdown();
2、异步发送消息
//Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); //Shut down once the producer instance is not longer in use. producer.shutdown();
3、单向发送消息
//Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "Hello World! ".getBytes(RemotingHelper.DEFAULT_CHARSET)); //Call send message to deliver message to one of brokers. producer.sendOneWay(msg); //Shut down once the producer instance is not longer in use. producer.shutdown();
接收消息
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(msg -> System.out.println(new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
原文:https://www.cnblogs.com/stronger-brother/p/12162617.html