srpingboot+rabbitmq Ultimate Edition

srpingboot+rabbitmq Ultimate Edition

  • rabbitmq flow chart

  • Main concepts of rabbitmq

    1. Virtual host: A virtual host holds a set of switches, queues and bindings. Why do we need multiple virtual hosts? Very simple, in RabbitMQ, users can only control permissions at the granularity of the virtual host. Therefore, if you need to prohibit group A from accessing the switch/queue/binding of group B, you must create a virtual host for A and B respectively. Every RabbitMQ server has a default virtual host "/".
    2. Exchange: Exchange is used to forward messages, but it will not do storage . If there is no Queue bind to Exchange, it will directly discard the messages sent by the Producer. There is a more important concept here: routing keys . When the message arrives at the exchange, the interaction opportunity is forwarded to the corresponding queue, then which queue is forwarded to depends on the routing key.
    3. Binding: That is, the switch needs to be bound to the queue. As shown in the figure above, there is a many-to-many relationship.
  • Common types of exchange:

    1. direct: The behavior of the direct type is "match first, then deliver". That is, a routing_key is set when binding, and the message's routing_key matches, it will be sent to the bound queue by the exchange.

    2. topic: forward messages according to rules (the most flexible)

      Introduction to topic switch rules:

      * Indicates a word (must appear, use a period. to separate words)

      # Indicates zero or more words

      The wildcard binding key is bound to the queue. If the binding key of queue Q1 is .TT. The binding key of queue Q2 is TT.# If the routing key carried by a message is A.TT.B, then queue Q1 will Will receive; If the routing key carried in a message is TT.AA.BB, then queue Q2 will receive;

    3. fanout: The mode of message broadcasting. Regardless of the routing key or routing mode, the message will be sent to all queues bound to it . If routing_key is configured, it will be ignored.

    4. Headers: also match according to rules. Compared with direct and topic, which use routing_key fixedly, headers is a type of custom matching rules. When the queue is bound to the exchange, a set of key-value pair rules will be set in the message It also includes a set of key-value pairs (headers attribute).When these key-value pairs have a pair, or all match, the message is delivered to the corresponding queue.

  • installation of rabbitmq

    1. Check if the image exists, select the version with the management interface

      docker search rabbitmq: management copy the code
    2. Pull to mirror

      docker pull rabbitmq: management copy the code
    3. Publish mirror

      docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER = admin -e RABBITMQ_DEFAULT_PASS = admin -p 15672: 15672 -p 5672: 5672 -p 25672: 25672 -p 61613: 61613 -p 1883: 1883 rabbitmq: management copy the code

      Parameter Description

      -d Run the container in the background;

      --name specifies the name of the container;

      -p specifies the port on which the service runs (5672: application access port; 15672: console web port number)

      -v mapping directories or files;

      --hostname host name (an important note of RabbitMQ is that it stores data according to the so-called "node name", the default is the host name);

      -e specifies environment variables; (RABBITMQ_DEFAULT_VHOST: default virtual machine name; RABBITMQ_DEFAULT_USER: default user name; RABBITMQ_DEFAULT_PASS: default user name password)

    4. Check if the container is started

      docker ps copy code
    5. Open the interface management http://Server-IP:15672, log in with the default username and password

  • Introduction to the main documents of the project

    1. Configuration file

      the Spring: RabbitMQ: Host: 81.68 .178 .65 Port: 5672 username: ADMIN password: ADMIN # manual confirmation listener: the Simple: Acknowledge the MODE-: Manual # support retry retry: Enabled: to true # springboot2.2.0 previously configured #publisher -confirms=true #springboot2.2.0 after #Phase 1 Producer-->Broker/Exchange confirmCallBack Confirmation phase publisher-confirm-type: correlated #Phase 2 Exchange --> Queue returnCallback callback confirmation phase publisher-returns: true # The official document says At this time this item must be set to true # In fact, the function of this item is: when the message [unsuccessfully arrived] queue, it can listen to the unreachable message, and call the returnCallback set by ourselves in an asynchronous manner. By default, this message will be directly discarded. not listening to Template: mandatory: to true copy the code

      type value

      • none: the default value, the confirmcallback mechanism is not enabled
      • correlated: Turn on confirmcallback. When publishing a message, you can specify a CorrelationData, which will be saved in the message header. When the message is delivered to Broekr, the producer specified ConfirmCallback will be triggered. This value will also be returned for comparison processing. CorrelationData can contain comparisons. Rich meta-information handles the callback logic. If there is no special requirement, set it to this value.
      • Simple mode: The first effect is the same as the correlated value that can trigger the callback method, and the second is to use the rabbitTemplate to call the waitForConfirms or waitForConfirmsOrDie method to wait for the broker node to return the sending result after the message is successfully published. It is required to determine the next logic according to the returned result and perform more Complex business. The point to note is that if the waitForConfirmsOrDie method returns false, the channel will be closed, and the message cannot be sent next.

      acknowledge-mode value

    2. Configuration

      package; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfig { @Bean DirectExchange consumerExchange () { //durable: Whether to be persistent, the default is false, the persistent queue: will be stored on the disk, and will still exist when the message agent restarts, the temporary queue: the current connection is valid //exclusive: default It is also false and can only be used by the currently created connection, and the queue will be deleted when the connection is closed. This reference priority is higher than durable //autoDelete: Whether to delete automatically, when there is no producer or consumer using this queue, the queue will be deleted automatically. //Generally set the persistence of the queue, the other two are default false return ExchangeBuilder.directExchange( "consumerExchange" ).durable( true ).build(); } @Bean Queue queue () { //Durable whether to persist return QueueBuilder.durable( "consumer_route" ).build(); } @Bean Binding bindConsumerExchange () { //Queue binding switch, return BindingBuilder.bind(queue()).to(consumerExchange()).with( "consumer_route_key" ); } } Copy code
    3. Producer

      package; import; import; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.rabbit.core .RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind .annotation.RestController; import java.util.UUID; @RestController @RequestMapping(value ="/provider") @Slf4j public class ProviderController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnsCallBackService returnsCallBackService; @GetMapping public void provider () { try { String uuid = UUID.randomUUID().toString(); String message = "hello mq " + uuid; "send message: {}" ,message); //After the consumer confirms the receipt of the message, manually ack receipt callback rabbitTemplate.setConfirmCallback(confirmCallbackService); //Message delivery to the queue failed callback processing rabbitTemplate.setReturnsCallback(returnsCallBackService); //Sending switch name, routing key, message content rabbitTemplate.convertAndSend( "consumerExchange" , "consumer_route_key" ,message); } catch (AmqpException e) { e.printStackTrace(); } } } Copy code
    4. Message confirmation mechanism

      • mechanism

        The entire process from sending to signing for a message is Producer-->Broker/Exchange-->Broker/Queue-->Consumer. Therefore, if we just want to ensure the reliable delivery of the message, we only need to consider the first two stages, because the message only needs to Even if it arrives in the queue successfully, the delivery is successful.

        • For example, the specified Exchange does not exist when the message is delivered, then the stage will fail (confirmCallback confirmation mode)

        • If the delivery to the Exchange is successful, but the specified routing is wrong or for other reasons, and the message does not reach the Queue from the Exchange, it is an error in the second stage. (ReturnCallback return mode)

        From the perspective of the producer and the consumer, the successful delivery of the message to the queue is considered as successful delivery. Therefore, both stages 1 and 2 belong to the producer's side and need to pay attention to, and the stage 3 belongs to the consumer's side. Only the successful delivery of the message is considered here. Therefore, the sign-off part of the consumer is not considered. When Rabbitmq and springboot are integrated, the message confirmation is not turned on by default .

      • Use ConfirmCallback Producer-->Broker/Exchange to confirm

        Springboot automatically assembles RabbitTemplate, but because the message confirmation mechanism is not turned on by default, the confirmCallback property is not set during injection. If you want to use the confirmation mechanism, you need to manually configure RabbitTemplate.ConfirmCallback. It can be seen from the source code that it is modified by @FunctionalInterface annotation Interface, then we only need to implement the confirm method to implement our own business logic

        @FunctionalInterface public interface ConfirmCallback { void confirm ( @Nullable CorrelationData var1, boolean var2, @Nullable String var3) ; } Copy code
        package; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Component @Slf4j public class ConfirmCallbackService implements RabbitTemplate . ConfirmCallback { @Override public void confirm (CorrelationData correlationData, boolean ack, String s) { /** * correlationData: There is only one id property inside the object, which is used to indicate the uniqueness of the current message. * * ack: The state of the message being delivered to the broker, true means success. * * cause: indicates the reason for the delivery failure. */ //"return message: {}",correlationData.getReturned()); "boolean: {}" ,ack); "string message: {}" ,s); } } Copy code

        If the message is confirmed to be processed in a unified manner, then we can configure the processing in a unified manner instead of processing it separately in the code

        @Configuration @Slf4j public class RabbitConfig { @Autowired RabbitTemplate rabbitTemplate; //The method name does not matter, mainly @PostConstruct specifies that it will be called back @PostConstruct public void setCallback () { /** * Register confirmCallback for the rabbitTemplate created by the container * The message is delivered by the producer to the Broker/Exchange callback */ rabbitTemplate.setConfirmCallback( new RabbitTemplate.ConfirmCallback() { /** * @param correlationData The unique correlation data (message id) specified when sending the message * @param ack whether this message was successfully delivered to Exchange * @param cause the reason for the failure */ @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { if (ack) { "The message was successfully delivered to the exchange: [correlationData={}]" ,correlationData); } else { log.error( "Failed to deliver the message to the exchange: [correlationData={}, reason: {}]" , correlationData, cause); } } }); } } Copy code
      • Use ReturnCallback Broker/Exchange-->Broker/Queue callback

        And the principle of registration confirmCallback like, not much to say, looking directly at the configuration, you need to note that this will only callback messages delivered from Exchange to Queue [failure] is executed when .

        package; import; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component ; @Slf4j @Component public class ReturnsCallBackService implements RabbitTemplate . ReturnsCallback { @Override public void returnedMessage (ReturnedMessage returnedMessage) { /** * message; message body * replyCode; response code * replyText; response content * exchange; exchange * routingKey; routing key */ "returns call back info: {}" , JSONObject.toJSONString(returnedMessage)); } } Copy code
    5. consumer

      package; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit .annotation.RabbitListener; import org.springframework.stereotype.Component; import; @Component @RabbitListener(queues = "consumer_route") @Slf4j public class ConsumerService { @RabbitHandler public void consumer (String msg, Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { /** * deliveryTag: indicates the message delivery sequence number. The deliveryTag will increase every time the message is consumed or the message is re-delivered. In the manual message confirmation mode, we can perform operations such as ack, nack, and reject on the message of the specified deliveryTag. * * multiple: Whether to confirm in batches. If the value is true, all messages smaller than the deliveryTag of the current message will be acknowledged at one time. */ "consume message content: {}" , msg); channel.basicAck(deliveryTag, false ); } catch (IOException e) { if (message.getMessageProperties().getRedelivered()){ /** * basicReject: Reject message, which is different from basicNack in that batch operations cannot be performed, and other usages are very similar. * deliveryTag: indicates the message delivery sequence number. * * requeue: If the value is true, the message will be re-entered into the queue. */ log.error( "Message repeated processing failed, rejected processing" ); channel.basicReject(deliveryTag, false ); } else { /** * basicNack: Indicates failure confirmation. This method is generally used when the consumer message business is abnormal, and the message can be re-delivered to the queue. * deliveryTag: indicates the message delivery sequence number. * * multiple: Whether to confirm in batches. * * requeue: If the value is true, the message will be re-entered into the queue. * For repeated consumption of abnormal data, you can use channel.basicPublish(); the method is placed at the end of the consumption queue, use channel.basicNack(deliveryTag,false,true); this method * It will continue to consume repeatedly, resulting in an infinite loop. Therefore, for abnormal data, you can first confirm the receipt and then republish it to the end of the queue. You can also record the number of repeated consumption. If the upper limit is exceeded, you can do persistent data processing and push alarms. Manual Intervene Record the number of retries in redis or the database. After the maximum number of retries is reached, the message enters the dead letter queue or other queues, and then processes these messages separately; Use the retry function that comes with spring-rabbit; */ log.error( "The message is about to return to the queue..." ); channel.basicNack(deliveryTag, false , true ); } } } } Copy code
    6. rabbitmq retry mechanism

      • For message consumption failure, you can use rabbitmq's built-in retry function, see the configuration file for details

      • Record the number of retries in redis or the database, reach the maximum value for manual intervention, or enter other queues to process the message separately

      • note:

        The retry does not mean that RabbitMQ resends the message, but only the internal retry of the consumer. In other words, the retry has nothing to do with mq;

        Therefore, the above consumer code cannot add try{}catch(){}. Once the exception is caught, in the automatic ack mode, it is equivalent to the correct processing of the message, and the message is directly confirmed without triggering a retry;

    7. Message handling mechanism for abnormal consumption

      • The important interface class MessageRecoverer of the retry mechanism has implementation classes ImmediateRequeueMessageRecoverer and RejectAndDontRequeueRecoverer, RepublishMessageRecoverer

        ![image-20210608140612685](/Users/xiuxian/Library/Application Support/typora-user-images/image-20210608140612685.png)

        1. By default, the RejectAndDontRequeueRecoverer implementation class is used. According to the name of the implementation class, we can see that the function of the implementation class is to reject and not send the message back to the queue.

        2. RepublishMessageRecoverer republishes the message

          @Bean public DirectExchange errorExchange () { return new DirectExchange( "error-exchange" , true , false ); } @Bean public Queue errorQueue () { return new Queue( "error-queue" , true ); } @Bean public Binding errorBinding (Queue errorQueue, DirectExchange errorExchange) { return BindingBuilder.bind(errorQueue).to(errorExchange).with( "error-routing-key" ); } //Create RepublishMessageRecoverer, and republish abnormal messages to the exception queue after retrying @Bean public MessageRecoverer messageRecoverer () { return new RepublishMessageRecoverer(rabbitTemplate, "error-exchange" , "error-routing-key" ); } Copy code
        3. ImmediateRequeueMessageRecoverer immediately returns to the queue

          @Bean public MessageRecoverer messageRecoverer () { return new ImmediateRequeueMessageRecoverer(); } Copy code

          It will immediately return to the queue and repeat the cycle for retrying. Since the exception cannot be resolved immediately, it will affect the consumption of subsequent messages. This method is not recommended

        4. Dead letter queue

          /** * Dead letter switch * @return */ @Bean public DirectExchange dlxExchange () { return new DirectExchange(dlxExchangeName); } /** * Dead letter queue * @return */ @Bean public Queue dlxQueue () { return new Queue(dlxQueueName); } /** * Dead letter queue bound to dead letter switch * @param dlxQueue * @param dlxExchange * @return */ @Bean public Binding dlcBinding (Queue dlxQueue, DirectExchange dlxExchange) { return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey); } //The creation of the service queue requires some modifications, adding the configuration of the dead letter switch and the dead letter routing key /** * Business queue * @return */ @Bean public Queue queue () { Map<String,Object> params = new HashMap<>(); params.put( "x-dead-letter-exchange" ,dlxExchangeName); //Declare the dead letter exchange bound to the current queue params.put( "x-dead-letter-routing-key" ,dlxRoutingKey); //Declare The dead letter routing key of the current queue return QueueBuilder.durable(queueName).withArguments(params).build(); } Copy code

          Starting the service at this time will find the business queue and the private message queue

          The DLX and DLK logos appear on the service queue, and the logo has been bound to the dead letter switch and dead letter routing key. At this time, the producer is called to send the message. After the consumer retries 5 times, the default implementation class of MessageCover is RejectAndDontRequeueRecoverer, that is, requeue=false, and because the business queue is bound to the dead letter queue, the message will be deleted from the business queue and sent to the dead letter queue at the same time.


          If the ack mode is manual ack, you need to call the channe.nack method and set requeue=false to send the exception message to the dead letter team

Source address: