作者对这个例子的说明:这里。
RollingTopWords实现了每过N分钟更新M分钟内最热门话题,如每1分钟刷新一次过去5分钟的热门话题。
- spouts负责推送所有话题(words),同一话题推送给同一个RollingCountBolt task(使用Storm的fieldsGrouping实现)
- RollingCountBolt接收推送的话题,并保存话题的出现次数。每过1分钟,RollingCountBolt将每个话题的出现次数推送给中间处理节点IntermediateRankingsBolt(同样使用Storm的fieldsGrouping实现)
- IntermediateRankingsBolt内部保存一个TOP N的List,接收到推送过来的消息后刷新TOP N(根据话题发表次数排序word)。每过两秒钟将结果推送给最终节点TotalRankingsBolt。
- TotalRankingsBolt将2秒内收到的各个TOP N进行汇总后,选出最终的TOP N。
- Strom提供了类似Timer的机制,每过指定的时间系统自动发一个Tuple给Bolt,可通过如下代码在Bolt中设置
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;
}
//判断是否系统自动发送的Tuple
public static boolean isTick(Tuple tuple) {
return tuple != null
&& Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}
- 欲实现“每过1分钟刷新一次5分钟内热门话题”的要求,需要将5分钟划分为5段,每一分钟为一段。在该实现钟将5分钟称为一个窗口(window),每一段称做一个slot。系统中需要保存每一段的话题次数。每过1分钟对前面5分钟求和,然后扔掉第一段的数据。以上过程由SlidingWindowCounter类和SlotBasedCounter类实现,在RollingCountBolt中调用。
- 疑惑
- 本例在Bolt调用Rankings类中方法时做了线程同步,是否表明 Storm对每个Task的调用不是线程安全的?
Storm Starter例子RollingTopWords代码学习
原文:http://blog.csdn.net/fytain/article/details/45291911