SpringBoot 完全整合 RabbitMQ + 100% 发送消息 + 高可靠消费消息 + 源码解析

软件版本

软件 版本
Spring Boot 2.5.0
spring-boot-starter-amqp 2.5.0
Erlang 24.0
RabbitMQ 3.8.19

发送消息

配置信息

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: 'example'
    # 生产者 ==>> Exchange 确认方式
    publisher-confirm-type: correlated
    # Exchange ==>> Queue
    publisher-returns: true
  • publisher-confirm-type:此项有三个值,NONE、CORRELATED、SIMPLE
    • NONE:禁用回调,默认也是这个。
    • CORRELATED(建议使用):会触发回调方法。
    • SIMPLE:官方介绍为 Use RabbitTemplate#waitForConfirms() (or waitForConfirmsOrDie() within scoped operations。经测试发现,会触发回调方法,在回调方法中可以做自己的业务逻辑。还可以在发布消息的同步线程中调用 waitForConfirms 或者 waitForConfirmsOrDie 方法等待返回结果来进行下一步操作。
  • publisher-returns:是否开启 Exchange 到 Queue 的异常回调。

发送消息

public void send() {
      Demo demo = Demo.builder()
        .name("大漠知秋")
        .age(25)
        .height(new BigDecimal("175"))
        .build();

      CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().toUpperCase(Locale.ROOT));
      Boolean invokeResult = rabbitTemplate.invoke(
        operations -> {
          rabbitTemplate.convertAndSend("example", "example", demo, correlationData);
          boolean flag = false;
          try {
            flag = rabbitTemplate.waitForConfirms(5000);
            log.info("等待结果:${}$", flag);
          } catch (Exception e) {
            e.printStackTrace();
          }
          return flag;
        },
        (deliveryTag, multiple) -> {
          log.info("【SIMPLE waitForConfirms 形式】- deliveryTag:${}$, multiple:${}$", deliveryTag, multiple);
        },
        (deliveryTag, multiple) -> {
          log.info("【SIMPLE waitForConfirms 形式】- deliveryTag:${}$, multiple:${}$", deliveryTag, multiple);
        }
      );
}

  上方代码块(前者)的 9 – 27 行也可以使用 rabbitTemplate.convertAndSend("example", "example", demo, correlationData);(后者) 直接发送,上方使用的是带等待结果的形式,建议直接使用后者

后者使用起来对 publisher-confirm-type 没有要求,前者publisher-confirm-type 的要求不同,如下:

  • NONE:发送会成功,rabbitTemplate#waitForConfirms 会报错。下方代码块中的 rabbitTemplate#setConfirmCallback ack 为 false。
  • CORRELATED:发送会成功,rabbitTemplate#waitForConfirms 正常,下方代码块中的 rabbitTemplate#setConfirmCallback ack 正常。
  • SIMPLE:发送会成功,rabbitTemplate#waitForConfirms 正常,并且 rabbitTemplate#setConfirmCallback 第二个、第三个参数也会正常回调。下方代码块中的 rabbitTemplate#setConfirmCallback ack 正常。

对发送的消息进行回调确认

@Configuration
@Slf4j
public class AutoRabbitMqConfiguration {

    @Resource
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        // 消息是否成功发送到 Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("【发送消息到 Exchange】- 成功了。correlationData:${}$", correlationData);
            } else {
                log.warn("【发送消息到 Exchange】- 失败了。correlationData ID:${}$,returned:${}$,cause:${}$", correlationData, null != correlationData ? correlationData.getReturned() : "空", cause);
            }
        });

        // 触发 setReturnCallback 回调必须设置 mandatory=true, 否则 Exchange 没有找到 Queue 就会丢弃掉消息, 而不会触发回调
        rabbitTemplate.setMandatory(true);
        // 消息是否从 Exchange 路由到 Queue, 注意: 这是一个失败回调, 只有消息从 Exchange 路由到 Queue 失败才会回调这个方法
        rabbitTemplate.setReturnsCallback(returned -> {
            log.warn("【从 Exchange 发送消息到 Queue】- 失败了。returned:${}$", returned);
        });

        return rabbitTemplate;
    }

}
  • rabbitTemplate#setConfirmCallback:此处是针对 生产者 ==>> Exchange 的回调确认,当 publisher-confirm-type 配置为 CORRELATED 或者 SIMPLE 时都会在此处进行回调。
  • rabbitTemplate#setReturnsCallback:此处是针对 Exchange ==>> Queue 的回调,只有在失败的时候才会进行此处回调。注意 rabbitTemplate#setMandatory

通过以上的回调方法,我们可以在发消息前对消息做个状态标记,在回调结果中无论成功或者失败,就把标记更改为对应的状态。如果失败我们还可以把失败的消息放到死信队列、放到数据库、放到 Redis 等等,后续操作是定时任务轮询或者事件触发就根据业务来选择了。

消费消息

手动 Ack 模式

acknowledge-mode=manual 的 yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: 'example'
    # 生产者 ==>> Exchange 确认方式
    publisher-confirm-type: correlated
    # Exchange ==>> Queue
    publisher-returns: true
    listener:
      simple:
        # ACK 模式,此处选择手动 ACK
        acknowledge-mode: manual
        # 每次处理 100 条消息
        prefetch: 100
        # 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
        default-requeue-rejected: true
        retry:
          # 开启重试
          enabled: true
          # 最大重试 3 次,涵盖当前次
          max-attempts: 3
          # 每次重试间隔
          initial-interval: 3000
          # 最大允许重试时间
          max-interval: 30000
          # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
          multiplier: 2
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
  log.info("进入消费了");
  TimeUnit.SECONDS.sleep(3);
  log.info("消费完毕了");
}

  配置改成手动确认 Ack,这里没有进行手动确认操作,这就会造成消费的所有消息 Unacked

image-20210708194919824

断开连接,重新建立连接即可把消息恢复成正常的 Ready

@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
  log.info("进入消费了");
  int i = MyRandomUtils.nextInt(1, 100);
  if (i % 2 == 1) {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  } else {
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
  }
  TimeUnit.SECONDS.sleep(3);
  log.info("消费完毕了");
}

  如果一直走 channel#basicNack 会出现如下无限循环消费的场景,这是因为被 Nack 的消息又重新进行投递了:

image-20210708195251564

  • channel#basicNack(Nack 重新投递消息) 的三个参数说明
    • deliveryTag:手动确认模式下,用来找到指定一条消息的唯一标识。RabbitMQ 客户端启动之后会和 RabbitMQ 服务端建立一条 Connection,每一个消费者都会与 RabbitMQ 服务端建立一个 Channel,RabbitMQ 服务端会向对应的消费者推送消息,这个消息携带了一个 deliveryTag它代表了 RabbitMQ 服务端向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag 的范围仅限于 Channel
    • multiple:手动确认模式下,如果为 true,则可以一次性确认 deliveryTag 小于等于传入值的所有消息。否则只处理当前这条消息。
    • requeue:是否重新放入 Queue 中。如果为 false 那这条消息将丢弃了。

channel#basicAck 的参数同理,只是这里是 Ack(正常消费消息)。

关于异常

  手动 Ack 模式正确的做法应该是自己把异常捕获掉,正常就 Ack,异常就 Nack,也可以扩展出现 N 次 Nack 之后就入库或者其他地方保存,就别在 Nack 了,不然都成死循环了。这里只是延伸一下如果把异常抛出去会怎样?

@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
  log.info("进入消费了");
  if (1 == 1) {
    throw new Exception();
  }
  TimeUnit.SECONDS.sleep(3);
  log.info("消费完毕了");
}
  • acknowledge-mode=manual,default-requeue-rejected=false

image-20210709115445431

消息卡住,断开连接后消息恢复 Ready

  • acknowledge-mode=manual,default-requeue-rejected=true

  与 default-requeue-rejected=false 时一样。

  也由此可以推断出,在 acknowledge-mode=manual、抛出异常时,default-requeue-rejected=true 的设置值并不能决定消息是否重新放入 Queue。

MessageRecoverer

org.springframework.amqp.rabbit.retry.MessageRecoverer

  在经过一番查找之后发现在最后一次重试之后如果还出现了异常,会调用 org.springframework.retry.support.RetryTemplate#handleRetryExhausted 方法,进而调用 org.springframework.amqp.rabbit.retry.MessageRecoverer#recover 方法,如下:

image-20210709143106746

MessageRecoverer 的具体实现有以下几个:

image-20210709142707480

image-20210709142727754

  现在就是要确定使用的是哪一个实现,最终追踪代码发现是 org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer

image-20210709143342906

  本着刨根问底的心态,为什么就是 RejectAndDontRequeueRecoverer?为什么不是其他几个实现,继续向上探究 RejectAndDontRequeueRecoverer 是怎么来的,找到了这里 org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer#configure

image-20210709145019018

  显然 this.messageRecovererNULL,所以直接 new 了一个 RejectAndDontRequeueRecoverer。那 this.messageRecoverer 为什么为 NULL 呢?再往上追,发现 org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactoryConfigurer 方法中给 this.messageRecoverer 赋值的:

image-20210709145355642

  那这里的 this.messageRecoverer 是个啥?

image-20210709145456001

  发现是个 ObjectProviderObjectProvider 自行了解,确实项目中并没有配置 org.springframework.amqp.rabbit.retry.MessageRecoverer 的实现类为 Bean,所以这里就注入的也就没有,继而引发了后续代码的 new RejectAndDontRequeueRecoverer()

  所以想解决这个问题也简单,只需使用对应的 MessageRecoverer 实现,把它生成 Bean 交由 Spring 管理即可。

  言归正传,还回到 RejectAndDontRequeueRecoverer 类中,进到类中发现 recover 仅仅只是打印了下 warn 级别日志,再就是抛出异常:

image-20210709143550428

  可以看到这里抛出了异常 ListenerExecutionFailedException,再继续跟踪代码发现此异常最终会被方法 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary 拿来处理后续是否需要把抛错的消息重新入 Queue 操作,如下:

image-20210709143919152

  783 行做了一些判断,结果 ackRequired 影响着 792 行是否会进入 Nack 逻辑,先看这些判断。

  • !this.acknowledgeMode.isAutoAck():

image-20210709144213418

  只要 acknowledge-mode 不是配置的 NONE 就不会为 false。所以这里根据上下文是:true

  • !this.acknowledgeMode.isManual():

image-20210709144347583

  只要 acknowledge-mode 不是配置的 MANUAL 就不会为 false。所以这里根据上下文是:false

  • ContainerUtils.isRejectManual(ex):

image-20210709144557888

  根据前文已经知道,抛的异常是 ListenerExecutionFailedException,所以这里肯定返回:false

  所以最终结果是:true && (false || false) = false。所以也就无法走后续的 Nack 逻辑。进而导致以下一直 Unacked 的局面:

image-20210709150524543

ImmediateRequeueMessageRecoverer

org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer

  换上 MessageRecoverer 的另一个实现 ImmediateRequeueMessageRecoverer 测试一下。查看 recover 方法:

image-20210709181228677

image-20210709154736541

  也是打印日志、抛异常,只是抛的异常类 ImmediateRequeueAmqpException 不一样。先说结论:ImmediateRequeueMessageRecoverer 实现也不会在 rollbackOnExceptionIfNecessary 方法中进行 Nack,但是在 rollbackOnExceptionIfNecessary 执行之后会进行重启。

  先把代码定位到 rollbackOnExceptionIfNecessary 的调用者 org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java:1010

image-20210709155930449

  继续把异常 ImmediateRequeueAmqpException 向上抛,由于中间一直没有能正常捕获此异常的 catch,所以直接就抛到了上图中下面的红框截图上:

image-20210709160251973

  继续向下走,打一下消费者异常日志,然后进入方法 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#killOrRestart

image-20210709160629225

image-20210709161559259

  就是在此处进行了断开连接,closeChannel 执行完会释放 Unacked 的消息,变成 Ready。closeConnection 不会关闭,因为内部就不会关闭成功:

image-20210709162339038

  这里是并不是通过 Nack 进行消息重新投递,而是通过关闭对应的 Channel 做到消息重新投递,这样可以一直无限循环消费此消息,但也会存在问题:后续消息都堵塞住了

  回到原来的的 RejectAndDontRequeueRecoverer,为什么它就没走这一套关闭连接的流程呢?回到调用 rollbackOnExceptionIfNecessary 方法处:

image-20210709163321629

  此时抛出的异常为:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException,此异常在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop 已被捕获:

image-20210709163515899

  之后也没有存在有效的处理,就继续监听下一个消息了:

image-20210709163729828

RepublishMessageRecoverer

org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer

image-20210709181322378

  与前两个实现的区别主要在于 recover 方法上:

image-20210709175118773

  这里做了向其他队列转发的操作,但是没有抛异常,所以也就不会走 rollbackOnExceptionIfNecessary 以及其之后的逻辑。但是他也没有 Ack 或者 Nack 当前出问题的这条消息,所以会出现原消息一直 Unacked,一旦断开连接又会恢复 Ready,被转发的那个队列也会存在这条消息。

总结

  所以在 acknowledge-mode=manual 时,default-requeue-rejected是什么根本无关紧要,消息是不是会重新投递到 Queue 取决于 MessageRecoverer 的实现。

RejectAndDontRequeueRecoverer

  • 消息不会重新投递
  • 消费的消息一直处于 Unacked 状态,当断开连接时,消息会转为 Ready,继续等待被消费
  • 会继续消费下一条消息

ImmediateRequeueMessageRecoverer

  • 消息会重新投递(出错时会断开 Channel 连接)
  • 消费的消息一直重复被消费,这会影响后续消费
  • 不会继续消费下一条消息。只要当前消息还一直报错,后续消息永远无法被消费掉

RepublishMessageRecoverer

  • 消息不会重新投递
  • 消费的消息一直处于 Unacked 状态,当断开连接时,消息会转为 Ready,继续等待被消费
  • 会继续消费下一条消息
  • 会投递到新的指定 Queue

自动 Ack 模式

acknowledge-mode=auto的 yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: 'example'
    # 生产者 ==>> Exchange 确认方式
    publisher-confirm-type: correlated
    # Exchange ==>> Queue
    publisher-returns: true
    listener:
      simple:
        # ACK 模式,此处选择手动 ACK
        acknowledge-mode: auto
        # 每次处理 100 条消息
        prefetch: 100
        # 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
        default-requeue-rejected: false
        retry:
          # 开启重试
          enabled: true
          # 最大重试 3 次,涵盖当前次
          max-attempts: 3
          # 每次重试间隔
          initial-interval: 3000
          # 最大允许重试时间
          max-interval: 30000
          # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
          multiplier: 2
@RabbitListener(queues = "example")
public void consume(Message message) throws Exception {
  log.info("进入消费了");
  TimeUnit.SECONDS.sleep(3);
  log.info("消费完毕了");
}

关于异常

  依照上面 yaml 的配置,这里会进行总共三次消费,但是都会报错,所以三次之后会走异常处理逻辑。

RejectAndDontRequeueRecoverer

  大部分内容已经说过,这里不再赘述,直接从 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary 开始:

image-20210712200625026

  这里与上面不同的是 !this.acknowledgeMode.isAutoAck()!this.acknowledgeMode.isAutoAck() 永远都是 true,所以 ackRequired 的值就是 true,那下面的代码就与 acknowledge-mode=manual 时走的代码分支不一样了:

image-20210712200824252

  这里的关键点就落在了 ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger) 方法上,进去查看:

image-20210712201854087

  • defaultRequeueRejected:这里就是 yaml 中配置的 default-requeue-rejected,但是他起不到决定性作用

  注释写的也比较清楚,这个方法是用来确定消息是否应该重新投递;如果可抛出对象为 MessageRejectedWhileStoppingExceptiondefaultRequeueRejected 为 true 并且在异常原因链中没有 AmqpRejectAndDontRequeueException 或者在异常原因链中有 ImmediateRequeueAmqpException,则返回true。

  所以这里的 defaultRequeueRejected 用处只能说是有点用,但具体的还是要看抛的什么异常,如果是 AmqpRejectAndDontRequeueException 异常,那不好意思,这个消息就没了,不会重新投递到 Queue 了。如果是 ImmediateRequeueAmqpException 或者是其他自定义异常,都是可以重新投递到 Queue 中的。

ImmediateRequeueAmqpException

  会重新进行投递消息到 Queue,后续代码把 Channel 断开并重连。整体流程与 acknowledge-mode=manual 不差啥。

RepublishMessageRecoverer

  此类的实现方法 org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer#recover 并没有抛出异常,所以也就不会走到 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary最终会把消息投递到其他指定队列,原消息消费完毕正常 Ack。

  详细 Ack 方法见 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#commitIfNecessary

image-20210713094629326

  这里 ackRequired 的结果肯定为 true,就会走下方的 Ack 方法。

总结

  所以在 acknowledge-mode=auto 时,default-requeue-rejected是什么以及其起到的作用要受 MessageRecoverer 的实现影响,消息是不是会重新投递到 Queue 取决于 MessageRecoverer 的实现。如果是自己实现 MessageRecoverer 接口,那 default-requeue-rejected 的设置是生效的。

RejectAndDontRequeueRecoverer

  • 消息不会重新投递
  • 消息最终会被 Nack,并且不重新投递,也就是说消息没了。
  • 会继续消费下一条消息

ImmediateRequeueMessageRecoverer

  • 消息会重新投递(出错时会断开 Channel 连接)
  • 消费的消息一直重复被消费,这会影响后续消费
  • 不会继续消费下一条消息。只要当前消息还一直报错,后续消息永远无法被消费掉

RepublishMessageRecoverer

  • 消息不会重新投递
  • 消费的消息会在最终进行正常 Ack
  • 会继续消费下一条消息
  • 会投递到新的指定 Queue

重试

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          # 开启重试
          enabled: true
          # 最大重试 3 次,涵盖当前次
          max-attempts: 3
          # 每次重试间隔
          initial-interval: 3000
          # 最大允许重试时间
          max-interval: 5000
          # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
          multiplier: 2
  • 这里的重试配置的最大是 3 次,包括第一次正常消费,如果出错其实重试只有 2 次。重试间隔是 3 秒,最大重试间隔时间为 5 秒,multiplier 为 2。组合含义如下:
  1. 第一次正常消费肯定是里面就消费了。
  2. 抛错之后第二次要等待 3 秒钟后,initial-interval 并不大于 max-interval,所以按照 initial-interval 的间隔时间来。
  3. 再次抛错之后第三次要等待 上次重试时间间隔 * multiplier,也就是 6 秒,显然 6 秒已经大于 max-interva,所以这次的重试间隔时间是 5 秒。
  • 重试这件事与 RabbitMQ 服务端并没有任何关系,这是 Spring 封装的功能,属于客户端重试。

推荐配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: 123456
    virtual-host: 'example'
    # 生产者 ==>> Exchange 确认方式
    publisher-confirm-type: correlated
    # Exchange ==>> Queue
    publisher-returns: true
    listener:
      simple:
        # ACK 模式,此处选择手动 ACK
        acknowledge-mode: auto
        # 每次处理 100 条消息
        prefetch: 100
        # 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
        default-requeue-rejected: false
        retry:
          # 开启重试
          enabled: true
          # 最大重试 3 次,涵盖当前次
          max-attempts: 3
          # 每次重试间隔时间
          initial-interval: 3000
          # 最大允许重试间隔时间,用来限制 initial-interval。
          max-interval: 10000
          # 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
          multiplier: 2
  1. 生产端,一定要重写 org.springframework.amqp.rabbit.core.RabbitTemplate#setConfirmCallbackorg.springframework.amqp.rabbit.core.RabbitTemplate#setReturnsCallback,前者保证消息 生产者 ==>> Exchange,后者保证消息 Exchange ==>> Queue,可以根据这两处来对消息进行标记是否正常发送,可以存库。如果有多次发送的情况产生,同时也要记得处理幂等问题。
  2. 消费端 acknowledge-mode: autodefault-requeue-rejected: false 即可,这个前提就保证了消息不会再次重新投递到原来的队列。

    1. 第一种方式:自己重写一个 MessageRecoverer,在 recover 方法中是入库或者是发送到其他 Queue 都可,不抛出异常,这样最终消息会被 Ack,消息也被存入库中或者是发送到其他 Queue 并等待后续处理。
    2. 第二种方式(推荐):自己也不用重写 MessageRecoverer,默认会使用 RejectAndDontRequeueRecoverer,最终会进行 Nack 并且 No Requeue,在 RabbitMQ 服务端为每个业务队列都配置死信队列,把消息最终都收集到死信队列中。

TIPS

  • 以上均为单个消费者 Channel 情况。

发表评论

电子邮件地址不会被公开。 必填项已用*标注