The pit of Storm's ack mechanism in project application

The pit of Storm's ack mechanism in project application

Big brothers who are learning Storm, I’m here again to teach and teach, do you think I know how to use ack? Okay, let me start slapping you in the face.

Let me talk about the ACK mechanism first:

  In order to ensure that the data can be processed correctly, storm will track each tuple generated by the spout.

  This involves the processing of ack/fail. If a tuple is processed successfully, it means that this Tuple and all the Tuples generated by this Tuple have been successfully processed, and the ack method of spout will be called;

  If the failure refers to the failure of this Tuple or one of the Tuples generated by this Tuple, the spout's fail method will be called;

  Each bolt in the processing tuple will inform Storm through the OutputCollector whether the current bolt processing is successful.

  In addition, it should be noted that when the spout triggers the fail action, the failed tuple will not be automatically retransmitted. We need to retrieve the failed transmission data in the spout and manually resend it again.

Ack principle Storm has a special task named acker, they are responsible for tracking the Tuple tree of each Tuple sent by the spout (because a tuple is sent through the spout, after each bolt is processed, a new tuple will be generated and sent out) . When acker (a task started by the framework) finds that a Tuple tree has been processed, it will send a message to the task that generated the Tuple. Acker's tracking algorithm is one of Storm's main breakthroughs. For an arbitrarily large Tuple tree, it only needs a constant 20 bytes to track. The principle of the Acker tracking algorithm: Acker saves an ack-val check value for each spout-tuple, its initial value is 0, and then every time a Tuple or Ack is transmitted, the id of the Tuple must be adjusted with this value. The test value is XORed, and the obtained value is updated to the new value of ack-val. Then suppose that every Tuple sent out is acked, then the final ack-val value must be 0. Acker judges whether it is completely processed according to whether ack-val is 0, and if it is 0, it is considered to be completely processed.

To implement the ack mechanism: 1. Specify the messageId when the spout transmits the tuple. 2. The spout must rewrite the fail and ack methods of BaseRichSpout. 3. The spout caches the transmitted tuple (otherwise the fail method of the spout receives the messageId and spout sent by acker. The data that failed to be sent can not be retransmitted). Look at the interface provided by the system. There is only the parameter msgId. The design here is unreasonable. In fact, there is a cache of the entire msg in the system, and only a messageid is given to the user. How about the user? Obtaining the original msg seems to require its own cache, and then use this msgId to query, which is too cheating 3, spout deletes the ack tuple from the cache queue according to the messageId, and can choose to resend the failed tuple. 4. Set the number of ackers to be at least greater than 0; Config.setNumAckers(conf, ackerParal);

Storm's Bolt has BsicBolt and RichBolt: In BasicBolt, when BasicOutputCollector emits data, it is automatically associated with the input tuple, and when the execute method ends, the input tuple will be automatically acknowledged. To use RichBolt, it is necessary to display the source tuple specifying the data when the data is emitted, and the second parameter anchor tuple must be added to maintain the tracker link, that is, collector.emit(oldTuple, newTuple); and it needs to be called after the execute execution is successful. OutputCollector.ack(tuple), when failure processing, execute;

A new tuple generated by a tuple is called anchoring. When you launch a tuple, you also complete an anchoring.

  The ack mechanism means that for each message sent by the spout, the spout receives the ack response from Acker within the specified time, that is, the tuple is considered to be successfully processed by the subsequent bolt; within the specified time (the default is 30 seconds), it is not received Acker's ack responds to the tuple and triggers the fail action, which means that the tuple processing has failed, and the timeout time can be set by Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS. Or receive the fail response tuple sent by Acker, and consider it as a failure, triggering the fail action

  Note, I began to think that if I inherited BaseBasicBolt, the program would throw an exception, and the spout would be retransmitted, but I was wrong and the program stopped abnormally.

  Here I take worldcount, an introductory case of distributed programming, as an example. Please look at the big screen below: Yes, I am the name Liu Yang you often hear when walking on the road.

  Here spout1-1task sends the sentence "i am liu yang" to bolt2-2task for processing. This task divides the sentence into words and distributes them to the next bolt according to the field, bolt2-2, bolt4-4, bolt5-5 for each A word is added with a suffix 1 and then sent to the next bolt for storage to the database. At this time, bolt7-7task fails when storing data in the database, and sends a fail response to the spout. At this time, the spout will send the message again when it receives the message. The data.

  Okay, then I think about a question: how does spout ensure that the data sent again is the data that failed before, so in the spout instance, it is absolutely necessary to define a map cache to cache every piece of data sent, the key of course is the messageId, as the spout instance After receiving all bolt responses, if it is ack, we will call our rewritten ack method. In this method, we will delete the key-value according to the messageId. If the spout instance receives all bolt responses, it is found to be fail. The fail method we rewritten will be called, and the corresponding data will be queried according to the messageId to send the data out again.

The spout code is as follows

public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;

    private HashMap<String, String> waitAck = new HashMap<String, String>();

    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;

    public void nextTuple() {
        String sentence = "i am liu yang";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
       //Specify the messageId, turn on the ackfail mechanism
        collector.emit(new Values(sentence), messageId);

    public void ack(Object msgId) {
        System.out.println("Message processing succeeded:" + msgId);
        System.out.println("Delete the data in the cache...");

    public void fail(Object msgId) {
        System.out.println("Message processing failed:" + msgId);
        System.out.println("Resend failed information...");
       //If the ackfail mechanism is not enabled for retransmission, the data in the map object of the spout will not be deleted.
        collector.emit(new Values(waitAck.get(msgId)),msgId);

 Although in the Storm project, our spout source usually comes from Kafka, and we use the tool KafkaSpout class provided by Storm, in fact, this class contains a collection of pairs of maintainers <messageId,Tuple>.

How does Storm handle duplicate tuples?

  Because Storm needs to ensure the reliable processing of tuples, when the tuple processing fails or times out, the spout will fail and resend the tuple, so there will be a problem of double calculation of the tuple. This problem is difficult to solve, and Storm does not provide a mechanism to help you solve it. Some feasible strategies:

(1) Don't deal with it, this can be regarded as a strategy. Because real-time calculations usually do not require high accuracy, subsequent batch calculations will correct the errors of real-time calculations.

(2) Use a third-party centralized storage to filter, such as using mysql, memcached or redis to remove duplicates based on the logical primary key.

(3) Use bloom filter for filtering, which is simple and efficient.

Question 1: Have you ever wondered what happens if the tuple processed by a certain task node keeps failing and the message keeps resending?

  We all know that spout, as the source of the message, will not be deleted before receiving the return information from the tuple to the left and right bolts. If the message continues to fail, it will cause the spout node to store more and more tuple data. Many, resulting in memory overflow.

Question 2: Have you ever thought that if one of the many sub-tuples of the tuple handles failure, but the other sub-tuples will continue to execute, if the sub-tuples are all performing data storage operations, then even if the entire message fails , Those generated child tuples will still be executed successfully without rollback.

  At this time, the native api of storm cannot support this transactional operation. We can use the advanced api-trident provided by storm to do it (I don’t know how to do it, and I haven’t studied it at present, but I can use it internally based on distributed Agreements such as two-phase submission agreements, etc.). To ensure transactional functions in this kind of business, we can do it according to our own business. For example, in the storage operation here, we first record whether the message has been stored in the storage state, and then query the state when it is stored in the storage. Whether to give execution.

Question 3: Tuple tracking does not necessarily have to be from the spout node to the last bolt. As long as the spout starts, the bolt can stop tracking at any level to respond.

The Acker task component is used to set the number of ackers in a topology. The default value is one. If there are more tuples in your topology, please set the number of ackers a little more, the efficiency will be higher.

Adjusting reliability acker task is very lightweight, so there is no need for a lot of ackers in a topology. You can track its performance through Strom UI(id: -1). If its throughput seems abnormal, then you need to add a little more acker. If reliability is not that important to you — you don't care about losing some data in some failure situations, then you can get better performance by not tracking these tuple trees. Not tracking messages will reduce the number of messages in the system by half, because an ack message is sent for each tuple. And it needs fewer IDs to save downstream tuples, reducing bandwidth usage. There are three ways to remove reliability.

The first is to set Config.TOPOLOGY_ACKERS to 0. In this case, storm will call the spout's ack method immediately after the spout sends a tuple. In other words, this tuple tree will not be tracked. The second method is to remove reliability at the tuple level. You can do not specify the messageid when launching the tuple to achieve the purpose of not following a specific spout tuple. The last method is if you are not very concerned about the success or failure of a certain part of a tuple tree, you can unanchor them when launching these tuples. In this way, these tuples are not in the tuple tree and will not be tracked.

Reliability configuration

There are three ways to remove the reliability of the message:

Set the parameter Config.TOPOLOGY_ACKERS to 0. Through this method, when Spout sends a message, its ack method will be called immediately;

When Spout sends a message, the messageID of this message is not specified. When you need to turn off the reliability of a specific message, you can use this method;

Finally, if you don't care about the reliability of the descendant messages derived from a certain message, the sub-message derived from this message should not be anchored when sent, that is, the input message is not specified in the emit method. Because these descendant messages are not anchored in any tuple tree, their failure will not cause any spout to resend the message.

How to turn off the Ack mechanism

There are 2 ways

Spout sends data without msgid

Set the acker number equal to 0

It is worth noting that the task that Storm calls Ack or fail is always the task that generated the tuple, so if a Spout is divided into many tasks to execute, the success or failure of the message execution will always notify the first tuple that was issued. That task.

As a Storm user, there are two things to do to make better use of Storm’s reliability features. 1. you need to notify Storm when you generate a tuple, and second, you must notify Storm after a tuple is completely processed, so that Storm can do so. Detect whether the entire tuple tree has completed processing, and notify the source Spout of the processing result.

1 Because the corresponding task is hung up, a tuple is not Ack:

Storm's timeout mechanism will mark this tuple as a failure after the timeout, so that it can be processed again.

2 Acker hangs up: In this case, all spout tuples tracked by this Acker will time out and will be processed again.

3 Spout hung up: In this case, the message source that sent the message to Spout is responsible for resending these messages.

3.basic mechanisms ensure that Storm is fully distributed, scalable and highly fault-tolerant.

In addition, the Ack mechanism is often used for current limiting : In order to avoid the spout sending data too fast and the bolt processing too slow, the pending number is often set. When the spout has a tuple equal to or more than the pending number and does not receive an ack or fail response , skip Execute nextTuple to limit the data sent by the spout.

Set the number of spout pend through conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);.