How does Kafkaspot ensure that the memory does not overflow under the ack mechanism

How does Kafkaspot ensure that the memory does not overflow under the ack mechanism

The kafkaspout class in the storm framework implements BaseRichSpout, which has rewritten the fail and ack methods, so our bolt must implement the ack mechanism to ensure that the message is re-sent; if the ack mechanism is not implemented, then kafkaspout cannot be obtained The processing response of the message will send the message again after the timeout, resulting in repeated sending of the message.

But recall that we write a spout class to implement BaseRichSpout and let it have message retransmission, then we will define a map set in our spout class and use msgId as the key.

public class MySpout extends BaseRichSpout {
    private static final long serialVersionUID = 5028304756439810609L;
   //key:messageId,Data
    private HashMap<String, String> waitAck = new HashMap<String, String>();
    private SpoutOutputCollector collector;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }
    public void nextTuple() {
        String sentence = "the cow jumped over the moon";
        String messageId = UUID.randomUUID().toString().replaceAll("-", "");
        waitAck.put(messageId, sentence);
       //Specify the messageId, turn on the ackfail mechanism
        collector.emit(new Values(sentence), messageId);
    }
    @Override
    public void ack(Object msgId) {
        System.out.println("Message processing succeeded:" + msgId);
        System.out.println("Delete the data in the cache...");
        waitAck.remove(msgId);
    }
    @Override
    public void fail(Object msgId) {
        System.out.println("Message processing failed:" + msgId);
        System.out.println("Resend failed information...");
       //If the ackfail mechanism is not enabled for retransmission, the data in the map object of the spout will not be deleted, and downstream
        collector.emit(new Values(waitAck.get(msgId)),msgId);
    }
}

So will kafkaspout also save the message that the bolt response has been sent but not received? If so, if message processing keeps failing, keeps retransmitting, and messages keep accumulating on kafkaspout nodes, will there be a memory overflow on the kafkaspout side?

In fact, it is not. Recalling the principle of Kafka, Kafka will keep some metadata information for each consumergroup-the position of the current consumption message, that is, the offset. This offset is controlled by the consumer. Under normal circumstances, the consumer will linearly increase the offset after consuming a message. Of course, the consumer can also set the offset to a smaller value and consume some messages again. In other words, when kafkaspot consumes Kafka data, after reading the message through the offset and sending it to the bolt, kafkaspot is only the current offset value of the saver.

When it fails or succeeds, query the offset value according to msgId, and then go to Kafka to consume the data to ensure that the message is re-sent.

So although the offset data is small, the memory overflows when the amount of offset data goes up?

In fact, no, kafkaspout finds that the cached data exceeds the limit, and will clean up the data at one end.

The code for sending data in kafkaspot

collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));

You can see that the offset parameter is wrapped in msgID.

It does not buffer the data information that has been sent out.

When he receives the response to the bolt, he will get the offset from the received msgId. The following is the key code extracted from the source code:

public void ack(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
          m.ack(id.offset);
     }
 }
 m.ack(id.offset);
 public void ack(Long offset) {
     _pending.remove(offset);//Processing successfully remove offset
     numberAcked++;
 }



 public void fail(Object msgId) {
     KafkaMessageId id = (KafkaMessageId) msgId;
     PartitionManager m = _coordinator.getManager(id.partition);
     if (m != null) {
         m.fail(id.offset);
      }
  }
  m.fail(id.offset);
  public void fail(Long offset) {
     failed.add(offset);//Processing failed to add offset
        numberFailed++;
   }
    
    SortedSet<Long> _pending = new TreeSet<Long>();
    SortedSet<Long> failed = new TreeSet<Long>();

There are many concepts of kafka involved in source code analysis, so it is very difficult to understand the concept of kafka to fully understand the source code of kafkaspot. If you don’t understand the concept of kafka, then you only need to understand that kafkaspot has done the above two in understanding the ack mechanism of storm. Things will do.