首页 > 编程语言 > 详细

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

时间:2020-05-29 22:54:23      阅读:193      评论:0      收藏:0      [点我收藏+]

前篇 消息队列客户端开发向导(基于 Spring 的 amqp 实现)

 

一、应答模式与重试机制

NONE 自动应答 即 autoAck 等于 true,rabbitmq 会自动把发送出去的消息置为确认,然后从内存/磁盘中删除,而不管消费者是否真正消费了消息。

MANUAL 需要手动 ACK/NACK

AUTO 根据是否抛出异常来决定 ACK/NACK

 

注:如果 应答模式为 AUTO,并且 retry.enabled 为 false,如果发生异常则会一直重试。

 

配置参考

spring:
  rabbitmq:
    host: 122.51.195.163
    username: rabbitmq
    password: rabbitmq
    listener:
      type: simple
      simple:
        retry:
          enabled: true
          max-attempts: 1
        max-concurrency: 100
        prefetch: 26
        batch-size: 1000 # 需要配合 receive-timeout 来使用
        acknowledge-mode: auto
    template:
      receive-timeout: 50S # 50秒 超时

 

 

二、批处理

批量发送

配置批处理策略,当批完成时才发送消息给 rabbitmq

@Bean
public BatchingRabbitTemplate batchingRabbitTemplate(ConnectionFactory connectionFactory) {
    BatchingStrategy strategy = new SimpleBatchingStrategy(1000, 25_000, 3_000);
    TaskScheduler scheduler = new ConcurrentTaskScheduler();
    BatchingRabbitTemplate template = new BatchingRabbitTemplate(strategy, scheduler);
    template.setConnectionFactory(connectionFactory);
    return template;
}

批处理发送消息

private final BatchingRabbitTemplate batchingRabbitTemplate;

public void sendTask(TempMsgCbTask task) {
    batchingRabbitTemplate.convertAndSend(TEMP_MSG_CB_EXCHANGE, task.getLevel(), task);
}

 

批量消费

默认批数据会自动 debatch 成单个。为了一次消费多个,需要设置 batchListener 为 true

@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
        matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
        SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setBatchListener(true);
    // if consumerBatchEnabled is true , deBatchingEnabled will be true. refer https://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#receiving-batch
    // factory.setConsumerBatchEnabled(true);
    configurer.configure(factory, connectionFactory);
    return factory;
}

批量消费消息

@RabbitListener(queues = TEMP_MSG_CB_LOG_QUEUE_1, priority = "1")
public void logQueue1(List<TempMsgCbTask> in) {
    log.info("Message read from logQueue1 msgId");
    List<TemplateSendCollect> sendCollects = tempPushStore.updateCbDetailAndLog(in);
    templateSendLogService.doUpdate(sendCollects);
}

当数据迟迟没有满足批处理大小 batch-size 时,如果达到超时时间 receive-timeout 将会提前消费。

 

233

消息队列客户端开发向导二(基于 Spring 的 amqp 实现)

原文:https://www.cnblogs.com/lemos/p/12989900.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!