简介
如上图所示,从 Publisher
开始,发送一个消息并指定 Routing key
,经由 Connection
中的 Channel
到达对应的 Exchange
,消息到达 Exchange
之后根据 Routing Key
路由到对应的 Queue
。消息再从 Queue
中经由 Consumer
端的 Connection
中的 Channel
到达对应的 Consumer
。
- Publisher: 消息的生产方。
- 消息: 一条消息想要发送到 RabbitMQ Server,需要指定 Exchange 和 Binding。
- Connection: 可以理解为生产方/消费方与 RabbitMQ Server 建立的一条 TCP 连接。
- Channel: 在 Connection 上进行的逻辑区分,相当于在一个物理 TCP 连接上面逻辑化出了一些
逻辑连接
,与 RabbitMQ Server 交互一般都是使用 Channel。 - Exchange: 消息交换器,生产方的消息会进入此处,然后根据消息中的 Routing Key 与 Binding 进行匹配,匹配成功,就把消息路由到对应 Binding 的 Queue,否则扔掉。
- Queue: Exchange 的消息会被路由到此处,等待被消费方消费。
- Consumer: 消息的消费方。
问题
这里所说消息的投递
,是指消息从 Publisher 到 Consumer。从上节可以知道,消息的投递是一个看似简单,其实内部还是比较复杂的操作,保不齐中间哪一个环节出现问题,就会导致消息的丢失。此处有三个环节可能出现消息投递失败的问题。
- Publisher -> Exchange: 网络问题。
- Exchange -> Queue: 找不到对应的 Binding。
- Queue -> Consumer: 网络问题、消费方处理出错。
针对这三个问题,RabbitMQ 也提供了对应的解决方案。
发送端确认机制(问题1)
同步单条消息确认机制(推荐)
- 开启发送端消息确认机制:
channel.confirmSelect()
- 发送消息之后调用
channel.waitForConfirms()
进行确认。
@Slf4j
public class PublisherConfirmTest {
private static final String EXCHANGE_NAME = "exchange.cat.dog";
private static final String QUEUE_NAME = "queue.cat";
private static final String KEY_NAME = "key.yingduan";
private static Connection connection;
private static Channel channel;
/**
* 监听队列回调函数
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
log.info("【监听队列回调函数】- 入参,message: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
};
@BeforeEach
void init() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明交换机 Exchange
channel.exchangeDeclare(
// 交换机名称,名称的规则:什么类型.交换者A.交换者B
EXCHANGE_NAME,
// 交换机类型,这里选择直接交换
BuiltinExchangeType.DIRECT,
// 持久,如果为 false,重启后交换机就没了
true,
// 自动删除,如果为 true,没有队列依赖此交换机,交换机就没了
false,
// 参数
null
);
// 声明队列
channel.queueDeclare(
// 队列名称,名称的规则:什么类型.干什么的
QUEUE_NAME,
// 持久,如果为 false,重启后队列就没了
true,
// 是否为当前 connection 独占,如果为 true,那其他 connection 就无法连接
false,
// 自动删除,如果为 true,队列中没有消息之后,队列就没了
false,
// 参数
null
);
// 绑定 Exchange 和 Queue
channel.queueBind(
QUEUE_NAME,
EXCHANGE_NAME,
KEY_NAME
);
// 消费者监听
channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
}
@AfterEach
void destroy() throws Exception {
channel.close();
connection.close();
}
/**
* 消息确认机制
*/
@Test
void publisherConfirm() throws Exception {
// 开启发送端确认机制
channel.confirmSelect();
channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, "一只英短猫来了".getBytes(StandardCharsets.UTF_8));
// 确认消息是否送达 Exchange
if (channel.waitForConfirms()) {
log.info("【生产者发送消息】- 成功了");
} else {
log.error("【生产者发送消息】- 失败了");
}
}
}
同步多条消息确认机制
- 开启发送端消息确认机制:
channel.confirmSelect()
- 发送多次消息。
- 调用
channel.waitForConfirms()
进行确认。
这种方式存在一些缺点,如果最终确认为 true 还没啥问题,但是如果为 false,那就无法确定这多个消息中是哪个没成功。
/**
* 消息确认机制
*/
@Test
void publisherConfirm() throws Exception {
// 开启发送端确认机制
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String s = "一只英短猫来了" + i;
channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, s.getBytes(StandardCharsets.UTF_8));
}
// 确认消息是否送达 Exchange
if (channel.waitForConfirms()) {
log.info("【生产者发送消息】- 成功了");
} else {
log.error("【生产者发送消息】- 失败了");
}
}
异步消息确认机制
- 开启发送端消息确认机制:
channel.confirmSelect()
channel.addConfirmListener()
添加确认监听器。- 发送多次消息。
异步消息确认机制同时支持单条和多条,这取决于处理消息的速度。
/**
* 消息确认机制
*/
@Test
void publisherConfirm() throws Exception {
// 开启发送端确认机制
channel.confirmSelect();
// 发送端监听
ConfirmListener confirmListener = new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("Ack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("Nack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
}
};
channel.addConfirmListener(confirmListener);
for (int i = 0; i < 10; i++) {
String s = "一只英短猫来了" + i;
channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, s.getBytes(StandardCharsets.UTF_8));
}
TimeUnit.SECONDS.sleep(3);
}
show:
16:03:21.948 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $3$, multiple: $true$
16:03:21.952 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $6$, multiple: $true$
16:03:21.952 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $7$, multiple: $false$
16:03:21.952 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $10$, multiple: $true$
- deliveryTag: 就是 Channel 发送消息时生成的一个序号,多个 Channel 之间会重复。deliverTag 代表已经接收到的条数及之前的条数,比如是 4,代表 1、2、3、4 都接收了。
- multiple: 一次确认多条为 true,单条为 false。
这种处理方式也不是很好,与 同步多条消息确认机制
存在同样的问题,不过好在有个 deliveryTag,可以通过数据库中转的方式确认是哪些消息成功,哪些消息失败。
消息返回机制(问题2)
RabbitMQ 提供了消息返回机制
来解决 Exchange -> Queue
没有路由的问题,如果没有能路由到 Queue,将会通知到发送方,发送方处理此问题。
在 channel.basicPublish()
的时候有一个重载方法,其中有一个参数 mandatory
。
- mandatory: 为 true 代表消息从 Exchange 无法路由到 Queue 时,将通知发送方。为 false 则表示直接丢弃。
/**
* 消息返回机制
*/
@Test
void publisherReturn() throws Exception {
// 开启发送端确认机制
channel.confirmSelect();
// 发送端监听
ConfirmListener confirmListener = new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("Ack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("Nack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
}
};
channel.addConfirmListener(confirmListener);
// 消息返回机制监听
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
// 如果进入此方法,说明有消息从 Exchange 路由到 Queue 失败了
log.error("【路由失败了】- replyCode: ${}$, replyText: ${}$, exchange: ${}$, routingKey: ${}$, properties: ${}$, body: ${}$",
replyCode, replyText, exchange, routingKey, properties, new String(body, StandardCharsets.UTF_8));
});
/*channel.addReturnListener(returnMessage -> {
// 与上边的用法一致,只是都包装到 returnMessage 中了。
});*/
// 用一个没有的 Routing Key
// mandatory: false
channel.basicPublish(EXCHANGE_NAME, KEY_NAME + 1, null, "一只英短猫来了1".getBytes(StandardCharsets.UTF_8));
// mandatory: true
channel.basicPublish(EXCHANGE_NAME, KEY_NAME + 1, true, null, "一只英短猫来了2".getBytes(StandardCharsets.UTF_8));
TimeUnit.SECONDS.sleep(3);
}
show:
18:57:39.652 [AMQP Connection 127.0.0.1:5672] ERROR com.lynchj.rabbitmq.main.PublisherReturnTest - 【路由失败了】- replyCode: $312$, replyText: $NO_ROUTE$, exchange: $exchange.cat.dog$, routingKey: $key.yingduan1$, properties: $#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)$, body: $一只英短猫来了2$
- replyCode: 返回状态码,类似 HTTP 状态码。
- replyText: 返回的原因。
- exchange: 交换器。
- routingKey: 路由 Key。
- properties: 配置。
- body: 消息体。
消费端确认机制(问题3)
问题三是情况下消费者端在订阅时开启了自动 Ack。自动的情况下,如果消费者端出错,消息就没了。所以改成手动 Ack,自主控制处理。
- 消费者在使用
channel.basicConsume()
订阅队列是,参数autoAck
设置为 false。 - 在消费消息的时候手动
channel.basicAck()
掉。
@Slf4j
public class ConsumerConfirmTest {
private static final String EXCHANGE_NAME = "exchange.cat.dog";
private static final String QUEUE_NAME = "queue.cat";
private static final String KEY_NAME = "key.yingduan";
private static Connection connection;
private static Channel channel;
/**
* 监听队列回调函数
*/
DeliverCallback deliverCallback = (consumerTag, message) -> {
log.info("【监听队列回调函数】- 入参,message: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
// 手动 Ack
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
// 手动 Nack
// requeue: 是否重新入队,如果此属性为 true,消息会被重新放置回去对应队列(如果可能的话,会放回到原来的位置),如果此属性为 false,消息直接被丢弃。
// 属性 requeue 如果设置为true,需要谨慎设计程序的逻辑,否则很有可能导致消息一直重复消费失败并且重复重新入队,表现为消费者线程出现死循环逻辑,耗尽服务器CPU资源。
// 因为第一次消费时既然 Nack 了,后续大概也不能成功,所以一般情况下选择不重新入队,而是通过其他方式处理。
// channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false);
// 手动 Reject
// basicNack 算是 basicReject 的一个扩展,因为 basicReject 不能一次拒绝多条消息。
// channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
// 对于 basicNack 和 basicReject ,如果参数 requeue 传入 false,消息还是会从队列里面删除。
};
@BeforeEach
void init() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
// 声明交换机 Exchange
channel.exchangeDeclare(
// 交换机名称,名称的规则:什么类型.交换者A.交换者B
EXCHANGE_NAME,
// 交换机类型,这里选择直接交换
BuiltinExchangeType.DIRECT,
// 持久,如果为 false,重启后交换机就没了
true,
// 自动删除,如果为 true,没有队列依赖此交换机,交换机就没了
false,
// 参数
null
);
// 声明队列
channel.queueDeclare(
// 队列名称,名称的规则:什么类型.干什么的
QUEUE_NAME,
// 持久,如果为 false,重启后队列就没了
true,
// 是否为当前 connection 独占,如果为 true,那其他 connection 就无法连接
false,
// 自动删除,如果为 true,队列中没有消息之后,队列就没了
false,
// 参数
null
);
// 绑定 Exchange 和 Queue
channel.queueBind(
QUEUE_NAME,
EXCHANGE_NAME,
KEY_NAME
);
// 消费者监听,关闭自动 ACK
channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
});
}
@AfterEach
void destroy() throws Exception {
channel.close();
connection.close();
}
/**
* 消息确认机制
*/
@Test
void publisher() throws Exception {
channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, "一只英短猫来了".getBytes(StandardCharsets.UTF_8));
TimeUnit.SECONDS.sleep(3);
}
}
总结
通过实施这三个问题对应的方案,可以有效解决 发送方 -> Exchange
、Exchange -> Queue
、Queue -> 消费方
中间可能出现的不稳定因素,避免丢失消息。
文章评论