首页 > 其他 > 详细

RocketMQ

时间:2021-02-20 09:47:21      阅读:31      评论:0      收藏:0      [点我收藏+]

官方介绍:http://rocketmq.apache.org/docs/quick-start/

window配置启动:

  1、 添加环境变量 

    ROCKETMQ_HOME="D:\rocketmq"
    NAMESRV_ADDR="localhost:9876" 

  2、启动名称服务器

    bin\mqnamesrv.cmd

  3、启动Broker

    bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

 

 可视化控制台:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

  mvn打包: mvn clean package -Dmaven.test.skip=true 

  启动:Java -jar rocketmq-console-ng-2.0.0.jar

  访问:localhost:8080

 

 

 

 

SpringBoot整合rocketmq

https://github.com/apache/rocketmq-spring/wiki

要求:

  JDK1.8及以上

  Maven 3.0及以上 

  SpringBoot 2.0及以上

 

1、添加依赖

        <!--add dependency in pom.xml-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>

2、配置namesrv 和 生产组 

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

3、发送消息

@Component
public class MyProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public void producer() {
        // 同步发送消息
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World sync!");
        //send spring message
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I‘m from spring message").build());
        // 异步发送消息
        rocketMQTemplate.asyncSend("test-topic-2", "Hello, World async!", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("async onSucess SendResult=%s %n", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.printf("async onException Throwable=%s %n", e);
            }

        });

        // 发送顺序排序消息
        rocketMQTemplate.syncSendOrderly("orderly_topic", MessageBuilder.withPayload("Hello, World").build(), "hashkey");

        // 销毁rocketMQTemplate,注意:一旦销毁,就不能再使用rocketMQTemplate发送消息
        //rocketMQTemplate.destroy();
    }
}

 

4、消费消息

  1)Push模式

/**
 * push模式消费消息
 */
@Component
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println(("received message: " + message));
    }
}

 

  2)Pull模式

  从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

  ①:application.properties 添加配置

rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test

 

  ②:代码中主动拉取消息示例

/**
 * pull模式消费消息
 */
@Component
public class PullConsumer {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public void pullMessage() {
        //This is an example of pull consumer using rocketMQTemplate.
        List<String> messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
    }
}

 

 

END.

RocketMQ

原文:https://www.cnblogs.com/yangyongjie/p/14410414.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!