软件版本
软件 | 版本 |
---|---|
Spring Boot | 2.5.0 |
spring-boot-starter-amqp | 2.5.0 |
Erlang | 24.0 |
RabbitMQ | 3.8.19 |
发送消息
配置信息
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: 'example'
# 生产者 ==>> Exchange 确认方式
publisher-confirm-type: correlated
# Exchange ==>> Queue
publisher-returns: true
- publisher-confirm-type:此项有三个值,NONE、CORRELATED、SIMPLE
- NONE:禁用回调,默认也是这个。
- CORRELATED(建议使用):会触发回调方法。
- SIMPLE:官方介绍为
Use RabbitTemplate#waitForConfirms() (or waitForConfirmsOrDie() within scoped operations
。经测试发现,会触发回调方法,在回调方法中可以做自己的业务逻辑。还可以在发布消息的同步线程中调用waitForConfirms
或者waitForConfirmsOrDie
方法等待返回结果来进行下一步操作。
- publisher-returns:是否开启 Exchange 到 Queue 的异常回调。
发送消息
public void send() {
Demo demo = Demo.builder()
.name("大漠知秋")
.age(25)
.height(new BigDecimal("175"))
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().toUpperCase(Locale.ROOT));
Boolean invokeResult = rabbitTemplate.invoke(
operations -> {
rabbitTemplate.convertAndSend("example", "example", demo, correlationData);
boolean flag = false;
try {
flag = rabbitTemplate.waitForConfirms(5000);
log.info("等待结果:${}$", flag);
} catch (Exception e) {
e.printStackTrace();
}
return flag;
},
(deliveryTag, multiple) -> {
log.info("【SIMPLE waitForConfirms 形式】- deliveryTag:${}$, multiple:${}$", deliveryTag, multiple);
},
(deliveryTag, multiple) -> {
log.info("【SIMPLE waitForConfirms 形式】- deliveryTag:${}$, multiple:${}$", deliveryTag, multiple);
}
);
}
上方代码块(前者)的 9 - 27 行也可以使用 rabbitTemplate.convertAndSend("example", "example", demo, correlationData);(后者)
直接发送,上方使用的是带等待结果的形式,建议直接使用后者
。
后者
使用起来对 publisher-confirm-type
没有要求,前者
对 publisher-confirm-type
的要求不同,如下:
- NONE:发送会成功,
rabbitTemplate#waitForConfirms
会报错。下方代码块中的rabbitTemplate#setConfirmCallback
ack 为 false。 - CORRELATED:发送会成功,
rabbitTemplate#waitForConfirms
正常,下方代码块中的rabbitTemplate#setConfirmCallback
ack 正常。 - SIMPLE:发送会成功,
rabbitTemplate#waitForConfirms
正常,并且rabbitTemplate#setConfirmCallback
第二个、第三个参数也会正常回调。下方代码块中的rabbitTemplate#setConfirmCallback
ack 正常。
对发送的消息进行回调确认
@Configuration
@Slf4j
public class AutoRabbitMqConfiguration {
@Resource
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息是否成功发送到 Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.debug("【发送消息到 Exchange】- 成功了。correlationData:${}$", correlationData);
} else {
log.warn("【发送消息到 Exchange】- 失败了。correlationData ID:${}$,returned:${}$,cause:${}$", correlationData, null != correlationData ? correlationData.getReturned() : "空", cause);
}
});
// 触发 setReturnCallback 回调必须设置 mandatory=true, 否则 Exchange 没有找到 Queue 就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从 Exchange 路由到 Queue, 注意: 这是一个失败回调, 只有消息从 Exchange 路由到 Queue 失败才会回调这个方法
rabbitTemplate.setReturnsCallback(returned -> {
log.warn("【从 Exchange 发送消息到 Queue】- 失败了。returned:${}$", returned);
});
return rabbitTemplate;
}
}
- rabbitTemplate#setConfirmCallback:此处是针对
生产者 ==>> Exchange
的回调确认,当publisher-confirm-type
配置为CORRELATED
或者SIMPLE
时都会在此处进行回调。 - rabbitTemplate#setReturnsCallback:此处是针对
Exchange ==>> Queue
的回调,只有在失败的时候才会进行此处回调。注意rabbitTemplate#setMandatory
。
通过以上的回调方法,我们可以在发消息前对消息做个状态标记,在回调结果中无论成功或者失败,就把标记更改为对应的状态。如果失败我们还可以把失败的消息放到死信队列、放到数据库、放到 Redis 等等,后续操作是定时任务轮询或者事件触发就根据业务来选择了。
消费消息
手动 Ack 模式
acknowledge-mode=manual 的 yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: 'example'
# 生产者 ==>> Exchange 确认方式
publisher-confirm-type: correlated
# Exchange ==>> Queue
publisher-returns: true
listener:
simple:
# ACK 模式,此处选择手动 ACK
acknowledge-mode: manual
# 每次处理 100 条消息
prefetch: 100
# 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
default-requeue-rejected: true
retry:
# 开启重试
enabled: true
# 最大重试 3 次,涵盖当前次
max-attempts: 3
# 每次重试间隔
initial-interval: 3000
# 最大允许重试时间
max-interval: 30000
# 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
multiplier: 2
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
log.info("进入消费了");
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
配置改成手动确认 Ack,这里没有进行手动确认操作,这就会造成消费的所有消息 Unacked
:
断开连接,重新建立连接即可把消息恢复成正常的
Ready
。
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
log.info("进入消费了");
int i = MyRandomUtils.nextInt(1, 100);
if (i % 2 == 1) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
如果一直走 channel#basicNack
会出现如下无限循环消费的场景,这是因为被 Nack 的消息又重新进行投递了:
- channel#basicNack(Nack 重新投递消息) 的三个参数说明
- deliveryTag:手动确认模式下,用来找到指定一条消息的唯一标识。RabbitMQ 客户端启动之后会和 RabbitMQ 服务端建立一条
Connection
,每一个消费者都会与 RabbitMQ 服务端建立一个Channel
,RabbitMQ 服务端会向对应的消费者推送消息,这个消息携带了一个deliveryTag
,它代表了 RabbitMQ 服务端向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,deliveryTag
的范围仅限于Channel
。 - multiple:手动确认模式下,如果为 true,则可以一次性确认
deliveryTag
小于等于传入值的所有消息。否则只处理当前这条消息。 - requeue:是否重新放入
Queue
中。如果为 false 那这条消息将丢弃了。
- deliveryTag:手动确认模式下,用来找到指定一条消息的唯一标识。RabbitMQ 客户端启动之后会和 RabbitMQ 服务端建立一条
channel#basicAck
的参数同理,只是这里是 Ack(正常消费消息)。
关于异常
手动 Ack 模式正确的做法应该是自己把异常捕获掉,正常就 Ack,异常就 Nack,也可以扩展出现 N 次 Nack 之后就入库或者其他地方保存,就别在 Nack 了,不然都成死循环了。这里只是延伸一下如果把异常抛出去会怎样?
@RabbitListener(queues = "example")
public void consume(Message message, Channel channel) throws Exception {
log.info("进入消费了");
if (1 == 1) {
throw new Exception();
}
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
- acknowledge-mode=manual,default-requeue-rejected=false
消息卡住,断开连接后消息恢复 Ready
- acknowledge-mode=manual,default-requeue-rejected=true
与 default-requeue-rejected=false
时一样。
也由此可以推断出,在
acknowledge-mode=manual
、抛出异常时,default-requeue-rejected=true
的设置值并不能决定消息是否重新放入 Queue。
MessageRecoverer
org.springframework.amqp.rabbit.retry.MessageRecoverer
在经过一番查找之后发现在最后一次重试之后如果还出现了异常,会调用 org.springframework.retry.support.RetryTemplate#handleRetryExhausted
方法,进而调用 org.springframework.amqp.rabbit.retry.MessageRecoverer#recover
方法,如下:
而 MessageRecoverer
的具体实现有以下几个:
现在就是要确定使用的是哪一个实现,最终追踪代码发现是 org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer
:
本着刨根问底的心态,为什么就是 RejectAndDontRequeueRecoverer
?为什么不是其他几个实现,继续向上探究 RejectAndDontRequeueRecoverer
是怎么来的,找到了这里 org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer#configure
:
显然 this.messageRecoverer
是 NULL
,所以直接 new 了一个 RejectAndDontRequeueRecoverer
。那 this.messageRecoverer
为什么为 NULL 呢?再往上追,发现 org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#simpleRabbitListenerContainerFactoryConfigurer
方法中给 this.messageRecoverer 赋值的:
那这里的 this.messageRecoverer
是个啥?
发现是个 ObjectProvider
,ObjectProvider
自行了解,确实项目中并没有配置 org.springframework.amqp.rabbit.retry.MessageRecoverer
的实现类为 Bean,所以这里就注入的也就没有,继而引发了后续代码的 new RejectAndDontRequeueRecoverer()
。
所以想解决这个问题也简单,只需使用对应的 MessageRecoverer
实现,把它生成 Bean 交由 Spring 管理即可。
言归正传,还回到 RejectAndDontRequeueRecoverer
类中,进到类中发现 recover
仅仅只是打印了下 warn
级别日志,再就是抛出异常:
可以看到这里抛出了异常 ListenerExecutionFailedException
,再继续跟踪代码发现此异常最终会被方法 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary
拿来处理后续是否需要把抛错的消息重新入 Queue 操作,如下:
783
行做了一些判断,结果 ackRequired 影响着 792
行是否会进入 Nack 逻辑,先看这些判断。
- !this.acknowledgeMode.isAutoAck():
只要 acknowledge-mode
不是配置的 NONE
就不会为 false。所以这里根据上下文是:true
- !this.acknowledgeMode.isManual():
只要 acknowledge-mode
不是配置的 MANUAL
就不会为 false。所以这里根据上下文是:false
- ContainerUtils.isRejectManual(ex):
根据前文已经知道,抛的异常是 ListenerExecutionFailedException
,所以这里肯定返回:false
所以最终结果是:true && (false || false) = false
。所以也就无法走后续的 Nack 逻辑。进而导致以下一直 Unacked 的局面:
ImmediateRequeueMessageRecoverer
org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer
换上 MessageRecoverer
的另一个实现 ImmediateRequeueMessageRecoverer
测试一下。查看 recover
方法:
也是打印日志、抛异常,只是抛的异常类 ImmediateRequeueAmqpException
不一样。先说结论:ImmediateRequeueMessageRecoverer
实现也不会在 rollbackOnExceptionIfNecessary
方法中进行 Nack,但是在 rollbackOnExceptionIfNecessary
执行之后会进行重启。
先把代码定位到 rollbackOnExceptionIfNecessary
的调用者 org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java:1010
:
继续把异常 ImmediateRequeueAmqpException
向上抛,由于中间一直没有能正常捕获此异常的 catch,所以直接就抛到了上图中下面的红框截图上:
继续向下走,打一下消费者异常日志,然后进入方法 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#killOrRestart
:
就是在此处进行了断开连接,closeChannel
执行完会释放 Unacked 的消息,变成 Ready。closeConnection
不会关闭,因为内部就不会关闭成功:
这里是并不是通过 Nack 进行消息重新投递,而是通过关闭对应的 Channel 做到消息重新投递,这样可以一直无限循环消费此消息,但也会存在问题:后续消息都堵塞住了
回到原来的的 RejectAndDontRequeueRecoverer
,为什么它就没走这一套关闭连接的流程呢?回到调用 rollbackOnExceptionIfNecessary
方法处:
此时抛出的异常为:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException
,此异常在 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
已被捕获:
之后也没有存在有效的处理,就继续监听下一个消息了:
RepublishMessageRecoverer
org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer
与前两个实现的区别主要在于 recover
方法上:
这里做了向其他队列转发的操作,但是没有抛异常,所以也就不会走 rollbackOnExceptionIfNecessary
以及其之后的逻辑。但是他也没有 Ack 或者 Nack 当前出问题的这条消息,所以会出现原消息一直 Unacked
,一旦断开连接又会恢复 Ready
,被转发的那个队列也会存在这条消息。
总结
所以在 acknowledge-mode=manual
时,default-requeue-rejected
是什么根本无关紧要,消息是不是会重新投递到 Queue 取决于 MessageRecoverer
的实现。
RejectAndDontRequeueRecoverer
- 消息不会重新投递
- 消费的消息一直处于
Unacked
状态,当断开连接时,消息会转为Ready
,继续等待被消费 - 会继续消费下一条消息
ImmediateRequeueMessageRecoverer
- 消息会重新投递(出错时会断开 Channel 连接)
- 消费的消息一直重复被消费,这会影响后续消费
- 不会继续消费下一条消息。只要当前消息还一直报错,后续消息永远无法被消费掉
RepublishMessageRecoverer
- 消息不会重新投递
- 消费的消息一直处于
Unacked
状态,当断开连接时,消息会转为Ready
,继续等待被消费 - 会继续消费下一条消息
- 会投递到新的指定
Queue
中
自动 Ack 模式
acknowledge-mode=auto的 yml 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: 'example'
# 生产者 ==>> Exchange 确认方式
publisher-confirm-type: correlated
# Exchange ==>> Queue
publisher-returns: true
listener:
simple:
# ACK 模式,此处选择手动 ACK
acknowledge-mode: auto
# 每次处理 100 条消息
prefetch: 100
# 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
default-requeue-rejected: false
retry:
# 开启重试
enabled: true
# 最大重试 3 次,涵盖当前次
max-attempts: 3
# 每次重试间隔
initial-interval: 3000
# 最大允许重试时间
max-interval: 30000
# 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
multiplier: 2
@RabbitListener(queues = "example")
public void consume(Message message) throws Exception {
log.info("进入消费了");
TimeUnit.SECONDS.sleep(3);
log.info("消费完毕了");
}
关于异常
依照上面 yaml
的配置,这里会进行总共三次消费,但是都会报错,所以三次之后会走异常处理逻辑。
RejectAndDontRequeueRecoverer
大部分内容已经说过,这里不再赘述,直接从 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary
开始:
这里与上面不同的是 !this.acknowledgeMode.isAutoAck()
和 !this.acknowledgeMode.isAutoAck()
永远都是 true
,所以 ackRequired
的值就是 true
,那下面的代码就与 acknowledge-mode=manual
时走的代码分支不一样了:
这里的关键点就落在了 ContainerUtils.shouldRequeue(this.defaultRequeueRejected, ex, logger)
方法上,进去查看:
- defaultRequeueRejected:这里就是 yaml 中配置的
default-requeue-rejected
,但是他起不到决定性作用
注释写的也比较清楚,这个方法是用来确定消息是否应该重新投递;如果可抛出对象为 MessageRejectedWhileStoppingException
或 defaultRequeueRejected
为 true 并且在异常原因链中没有 AmqpRejectAndDontRequeueException
或者在异常原因链中有 ImmediateRequeueAmqpException
,则返回true。
所以这里的 defaultRequeueRejected
用处只能说是有点用,但具体的还是要看抛的什么异常,如果是 AmqpRejectAndDontRequeueException
异常,那不好意思,这个消息就没了,不会重新投递到 Queue 了。如果是 ImmediateRequeueAmqpException
或者是其他自定义异常,都是可以重新投递到 Queue 中的。
ImmediateRequeueAmqpException
会重新进行投递消息到 Queue,后续代码把 Channel 断开并重连。整体流程与 acknowledge-mode=manual
不差啥。
RepublishMessageRecoverer
此类的实现方法 org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer#recover
并没有抛出异常,所以也就不会走到 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#rollbackOnExceptionIfNecessary
,最终会把消息投递到其他指定队列,原消息消费完毕正常 Ack。
详细 Ack 方法见 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer#commitIfNecessary
:
这里 ackRequired
的结果肯定为 true
,就会走下方的 Ack 方法。
总结
所以在 acknowledge-mode=auto
时,default-requeue-rejected
是什么以及其起到的作用要受 MessageRecoverer
的实现影响,消息是不是会重新投递到 Queue 取决于 MessageRecoverer
的实现。如果是自己实现 MessageRecoverer
接口,那 default-requeue-rejected
的设置是生效的。
RejectAndDontRequeueRecoverer
- 消息不会重新投递
- 消息最终会被 Nack,并且不重新投递,也就是说消息没了。
- 会继续消费下一条消息
ImmediateRequeueMessageRecoverer
- 消息会重新投递(出错时会断开 Channel 连接)
- 消费的消息一直重复被消费,这会影响后续消费
- 不会继续消费下一条消息。只要当前消息还一直报错,后续消息永远无法被消费掉
RepublishMessageRecoverer
- 消息不会重新投递
- 消费的消息会在最终进行正常 Ack
- 会继续消费下一条消息
- 会投递到新的指定
Queue
中
重试
spring:
rabbitmq:
listener:
simple:
retry:
# 开启重试
enabled: true
# 最大重试 3 次,涵盖当前次
max-attempts: 3
# 每次重试间隔
initial-interval: 3000
# 最大允许重试时间
max-interval: 5000
# 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
multiplier: 2
- 这里的重试配置的最大是 3 次,包括第一次正常消费,如果出错其实重试只有 2 次。重试间隔是 3 秒,最大重试间隔时间为 5 秒,multiplier 为 2。组合含义如下:
- 第一次正常消费肯定是里面就消费了。
- 抛错之后第二次要等待 3 秒钟后,
initial-interval
并不大于max-interval
,所以按照initial-interval
的间隔时间来。 - 再次抛错之后第三次要等待
上次重试时间间隔 * multiplier
,也就是 6 秒,显然 6 秒已经大于max-interva
,所以这次的重试间隔时间是 5 秒。
- 重试这件事与 RabbitMQ 服务端并没有任何关系,这是 Spring 封装的功能,属于客户端重试。
推荐配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: 'example'
# 生产者 ==>> Exchange 确认方式
publisher-confirm-type: correlated
# Exchange ==>> Queue
publisher-returns: true
listener:
simple:
# ACK 模式,此处选择手动 ACK
acknowledge-mode: auto
# 每次处理 100 条消息
prefetch: 100
# 决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为 true,重新放回队列。这里设置为 false,如果多次重试还是异常就转发到死信队列
default-requeue-rejected: false
retry:
# 开启重试
enabled: true
# 最大重试 3 次,涵盖当前次
max-attempts: 3
# 每次重试间隔时间
initial-interval: 3000
# 最大允许重试间隔时间,用来限制 initial-interval。
max-interval: 10000
# 下一次重试的时间间隔 = 上次重试时间间隔 * multiplier
multiplier: 2
- 生产端,一定要重写
org.springframework.amqp.rabbit.core.RabbitTemplate#setConfirmCallback
和org.springframework.amqp.rabbit.core.RabbitTemplate#setReturnsCallback
,前者保证消息生产者 ==>> Exchange
,后者保证消息Exchange ==>> Queue
,可以根据这两处来对消息进行标记是否正常发送,可以存库。如果有多次发送的情况产生,同时也要记得处理幂等问题。 -
消费端
acknowledge-mode: auto
、default-requeue-rejected: false
即可,这个前提就保证了消息不会再次重新投递到原来的队列。- 第一种方式:自己重写一个
MessageRecoverer
,在recover
方法中是入库或者是发送到其他 Queue 都可,不抛出异常,这样最终消息会被 Ack,消息也被存入库中或者是发送到其他 Queue 并等待后续处理。 - 第二种方式(推荐):自己也不用重写
MessageRecoverer
,默认会使用RejectAndDontRequeueRecoverer
,最终会进行 Nack 并且No Requeue
,在 RabbitMQ 服务端为每个业务队列都配置死信队列,把消息最终都收集到死信队列中。
- 第一种方式:自己重写一个
TIPS
- 以上均为单个消费者 Channel 情况。
文章评论