首页 > 编程语言 > 详细

RocketMQ之java

时间:2020-01-07 17:36:29      阅读:90      评论:0      收藏:0      [点我收藏+]

RocketMQ提供了Java客户端,用于发送和接收消息。

发送消息有三种方式:同步、异步和单向。

 

1、同步发送消息

//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest", "Hello World! ".getBytes(RemotingHelper.DEFAULT_CHARSET));
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
//Shut down once the producer instance is not longer in use.
producer.shutdown();

2、异步发送消息

//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println(sendResult);
    }

    @Override
    public void onException(Throwable e) {
        e.printStackTrace();
    }
});
//Shut down once the producer instance is not longer in use.
producer.shutdown();

3、单向发送消息

//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest", "Hello World! ".getBytes(RemotingHelper.DEFAULT_CHARSET));
//Call send message to deliver message to one of brokers.
producer.sendOneWay(msg);
//Shut down once the producer instance is not longer in use.
producer.shutdown();

 

接收消息

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

	@Override
	public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
			ConsumeConcurrentlyContext context) {
		msgs.forEach(msg -> System.out.println(new String(msg.getBody())));
		return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	}
});

//Launch the consumer instance.
consumer.start();  

RocketMQ之java

原文:https://www.cnblogs.com/stronger-brother/p/12162617.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!