public interface ISpout extends Serializable {
void open( Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}collector.emit( new Values("value1" , "value2") , msgId );collector.emit( tuple, new Values( word));
collector.emit( new Values( word));
this .collector.ack(tuple); this .collector.fail(tuple);
public class ReliableSentenceSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private ConcurrentHashMap<UUID, Values> pending;
private SpoutOutputCollector collector;
private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" };
private int index = 0;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "sentence"));
}
public void open( Map config, TopologyContext context, SpoutOutputCollector collector) {
this. collector = collector;
this. pending = new ConcurrentHashMap<UUID, Values>();
}
public void nextTuple() {
Values values = new Values( sentences[ index]);
UUID msgId = UUID. randomUUID();
this. pending.put(msgId, values);
this. collector.emit(values, msgId);
index++;
if ( index >= sentences. length) {
index = 0;
}
//Utils.waitForMillis(1);
}
public void ack(Object msgId) {
this. pending.remove(msgId);
}
public void fail(Object msgId) {
this. collector.emit( this. pending.get(msgId), msgId);
}
}一个实现可靠性的bolt:public class ReliableSplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare( Map config, TopologyContext context, OutputCollector collector) {
this. collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence" );
String[] words = sentence.split( " ");
for (String word : words) {
this. collector.emit(tuple, new Values(word));
}
this. collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "word"));
}
}原文:http://blog.csdn.net/suifeng3051/article/details/41682441