Storm's BaseBasicBolt source code parsing ack mechanism

Storm's BaseBasicBolt source code parsing ack mechanism

When we were learning the ack mechanism, we knew that Storm’s Bolt had BaseBasicBolt and BaseRichBolt. In BaseBasicBolt, BasicOutputCollector is automatically associated with the input tuple when it emits data, and the input tuple is automatically acknowledged when the execute method ends. When using BaseRichBolt to emit data, the source tuple of the specified data should be displayed and the second parameter anchor tuple should be added to maintain the tracker link, that is, collector.emit(oldTuple, newTuple); and it needs to be executed after successful execution Call OutputCollector.ack(tuple), when the failure is handled, execute OutputCollector.fail(tuple); then let's see if the source code of BasicBolt is like this, not because we see other people's posts saying it is like this, we just do this task , To see is to believe.

In order to facilitate the viewing of the source code, I will first go to our inherited class:

public class SplitSentenceBolt extends BaseBasicBolt {public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
    }
    
  //5: Execute our own logical processing method and receive the incoming parameters.
  public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = (String)input.getValueByField("sentence");
        String[] words = sentence.split(" ");
        for (String word: words) {
            word = word.trim();
            word = word.toLowerCase();
            collector.emit(new Values(word,1));//This place is to call the wrapper class of OutputCollector to send messages
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","num"));
    }
}

Through the breakpoint, we found that the bolt task will create this class and the following standard execution order will be

public class BasicBoltExecutor implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);    
    
    private IBasicBolt _bolt;
    private transient BasicOutputCollector _collector;
   //1: Create the object, and then assign the SplitSentenceBolt object we wrote to the parent class IBasicBolt.
    public BasicBoltExecutor(IBasicBolt bolt) {
        _bolt = bolt;
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        _bolt.declareOutputFields(declarer);//Here is the method of calling the SplitSentenceBolt object.
    }
   //2: Assign a value to the BasicOutputCollector _collector field. BasicOutputCollector is a wrapper for the OutputCollector class.
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _bolt.prepare(stormConf, context);
        _collector = new BasicOutputCollector(collector);
    }
  //3: Then the program executes the method, the value of input source: spout1:4, stream: default, id: {}, [+-*%/]
    public void execute(Tuple input) {
        _collector.setContext(input);//Set the received tuple value to the inputTuple field in BasicOutputCollector.
        try {
            _bolt.execute(input, _collector);//This place is to call the ecutute method of our implementation class SplitSentenceBolt.
            _collector.getOutputter().ack(input);//This place is the response
        } catch(FailedException e) {
            if(e instanceof ReportedFailedException) {
                _collector.reportError(e);
            }
            _collector.getOutputter().fail(input);//This place is the response
        }
    }
    public void cleanup() {
        _bolt.cleanup();
    }
    public Map<String, Object> getComponentConfiguration() {
        return _bolt.getComponentConfiguration();
    }
}
public class BasicOutputCollector implements IBasicOutputCollector {
    private OutputCollector out;
    private Tuple inputTuple;
    public BasicOutputCollector(OutputCollector out) {
        this.out = out;
    }
   //4: Assign the received tuple data to inputTuple. At this time, the fields of the BasicOutputCollector object all have values.

Don't worry about where the bolt starts when it starts. I will talk about it later. Here we are concerned about the execution process after the BasicBoltExecutor object is created. Let's look at the execution process. In the execute method of BasicBoltExecutor, we see that the ack and fail methods will be automatically called, and when our program throws an exception, the fail method will be executed.