简介
默认情况下,消费者并不是处理一条消息,RabbitMQ Server 再从 Queue 中推送一条继续处理。而是一下子尽可能的投送更多,缓存到到本地内存中等待处理。
比如 Queue 中存在 10W 条数据,消费者会一边处理一条,一边会进行缓存后续推送的 N 条,而这缓存的 N 条有可能会造成 OOM 或者其他故障。
消费者处理不了的场景
- 生产者发送速率高,消费者消费速率低。如:生产者每秒钟生产 1000 条消息,消费者每5秒才能处理一条消息,一段时间后,消费者 OOM。
- 生产者已启动一定时间,消费者没启动,这时消息都会堆积在 Queue 中,等消费者一起动,可能会直接 OOM。
要解决上面的问题也简单,就是控制消费者不要无限制的进行缓存消息,控制缓存消息数量。
QoS
为了解决上面的问题,RabbitMQ 提供了 QoS(服务质量保证)
功能,QoS 功能保证了在一定数量的消息未被 Ack 前,不推送新的消息。需要注意,不能使用自动 Ack,如果采用了自动 Ack,那也就就不存在未被 Ack
的情况了,会一直推送进行消费。
basicQos
RabbitMQ 提供了 basicQos()
方法进行设置 QoS,通过 channel 设置在订阅消息之前。有三个重载:
- void basicQos(int prefetchCount) throws IOException;
- void basicQos(int prefetchCount, boolean global) throws IOException;
- void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
- prefetchCount: 针对一个消费端最多推送多少未确认消息。如果无限制,则为0。
- global: 为 true 针对整个消费端最多推送多少未确认消息。为 false 针对当前 channel 最多推送多少未确认消息。
- prefetchSize: 单个消息的大小限制,一般为 0。如果无限制,则为 0。
注意:值一般为 0 到 65535。
global
和prefetchSize
属于 AMQP 协议中定义的内容,RabbitmMQ 并没有实现,可以只用 prefetchCount 参数的方法。
使用 basicQos 前
发送方
/**
* 消息发送
*/
@Test
void publisher() throws Exception {
for (int i = 0; i < 100000; i++) {
channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, "一只英短猫来了".getBytes(StandardCharsets.UTF_8));
}
// TimeUnit.SECONDS.sleep(3);
TimeUnit.MINUTES.sleep(1);
}
接收方
/**
* 监听队列回调函数
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
log.info("【监听队列回调函数】- 入参,message: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动 Ack
// multiple: 为 true 是确认多条。为 false 是确认单条。建议 false,true 容易搞混。
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
发送方一次性发送 10 条消息,接收方每5秒才能处理一条,会出现这种情况:
真正在处理的消息,每5秒才能处理一个,而推送到消费者这边的消息却有 4W 多条。
使用 basicQos 后
发送方
/**
* 消息发送
*/
@Test
void publisher() throws Exception {
for (int i = 0; i < 100000; i++) {
channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, "一只英短猫来了".getBytes(StandardCharsets.UTF_8));
}
// TimeUnit.SECONDS.sleep(3);
TimeUnit.MINUTES.sleep(1);
}
接收方
// 监听消息之前,设置 QoS
channel.basicQos(10);
// 消费者监听,关闭自动 ACK
channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
});
经过 QoS 限制之后,很好的保护了客户端、分布式处理堆积消息。
- 防止客户端缓存过重,线程爆炸、OOM。
- (重要)分布式处理堆积消息,假设没有设置 QoS、只启动了一个消费者,所有的消息都堆积到这个消费者的内存中,状态
Unacked
,那这时你就算启动更多个消费者也于事无补,因为都已经堆积到第一个消费者哪里了。而开启 QoS 之后,第一给消费者最多对接限制的条数,这时再横向扩展多个消费者就可以把剩余的消息进行消费了。
文章评论