mq不支持任意的时间京都,如果要支持,不可避免的需要在Broker层做消息排序,加上持久化方面的考量,将不可避免地带来巨大的性能消耗,所以rocketMQ只支持特定级别的延迟消息。
在Broker短通过messageDelayLevel配置。实现类:org.apache.rocketmq.store.schedule.ScheduleMessageService
public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";//定时消息统一主题 private static final long FIRST_DELAY_TIME = 1000L;//第一次调度时延迟的时间,默认1s private static final long DELAY_FOR_A_WHILE = 100L;//每一延时级别调度一次后延迟该时间间隔后再放入调度池。 private static final long DELAY_FOR_A_PERIOD = 10000L;//发送一场后延迟该时间后再继续参与调度 private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);//延迟级别与时间 private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);//延迟级别消息消费进度 private final DefaultMessageStore defaultMessageStore;//默认消息存储器 private final AtomicBoolean started = new AtomicBoolean(false); private Timer timer; private MessageStore writeMessageStore; private int maxDelayLevel;//MessageStoreConfig#messageDelayLevel中最大消息延迟级别
org.apache.rocketmq.store.DefaultMessageStore#load:
延迟消息消费队列消息进度加载+delayLevelTable数据构造。延迟队列消息消费进度默认存储路径为${ROCKET_HOME}/store/config/delayoffset
ScheduleMessageService#start:
原文:https://www.cnblogs.com/lccsblog/p/12287977.html