大漠知秋的加油站

  • 首页
大漠知秋的加油站
你就当我的烂笔头吧
  1. 首页
  2. Java
  3. 正文

RabbitMQ 消息百分百投递方案

2022年7月6日 1224点热度 0人点赞 0条评论

简介

AMQP

如上图所示,从 Publisher 开始,发送一个消息并指定 Routing key,经由 Connection 中的 Channel 到达对应的 Exchange,消息到达 Exchange 之后根据 Routing Key 路由到对应的 Queue。消息再从 Queue 中经由 Consumer 端的 Connection 中的 Channel 到达对应的 Consumer。

  • Publisher: 消息的生产方。
  • 消息: 一条消息想要发送到 RabbitMQ Server,需要指定 Exchange 和 Binding。
  • Connection: 可以理解为生产方/消费方与 RabbitMQ Server 建立的一条 TCP 连接。
  • Channel: 在 Connection 上进行的逻辑区分,相当于在一个物理 TCP 连接上面逻辑化出了一些 逻辑连接,与 RabbitMQ Server 交互一般都是使用 Channel。
  • Exchange: 消息交换器,生产方的消息会进入此处,然后根据消息中的 Routing Key 与 Binding 进行匹配,匹配成功,就把消息路由到对应 Binding 的 Queue,否则扔掉。
  • Queue: Exchange 的消息会被路由到此处,等待被消费方消费。
  • Consumer: 消息的消费方。

问题

这里所说消息的投递,是指消息从 Publisher 到 Consumer。从上节可以知道,消息的投递是一个看似简单,其实内部还是比较复杂的操作,保不齐中间哪一个环节出现问题,就会导致消息的丢失。此处有三个环节可能出现消息投递失败的问题。

  1. Publisher -> Exchange: 网络问题。
  2. Exchange -> Queue: 找不到对应的 Binding。
  3. Queue -> Consumer: 网络问题、消费方处理出错。

针对这三个问题,RabbitMQ 也提供了对应的解决方案。

发送端确认机制(问题1)

同步单条消息确认机制(推荐)

  1. 开启发送端消息确认机制:channel.confirmSelect()
  2. 发送消息之后调用 channel.waitForConfirms() 进行确认。
@Slf4j
public class PublisherConfirmTest {

    private static final String EXCHANGE_NAME = "exchange.cat.dog";
    private static final String QUEUE_NAME    = "queue.cat";
    private static final String KEY_NAME      = "key.yingduan";

    private static Connection connection;
    private static Channel    channel;

    /**
     * 监听队列回调函数
     */
    DeliverCallback deliverCallback = (consumerTag, message) -> {
        log.info("【监听队列回调函数】- 入参,message: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
    };

    @BeforeEach
    void init() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");

        connection = connectionFactory.newConnection();
        channel = connection.createChannel();

        // 声明交换机 Exchange
        channel.exchangeDeclare(
                // 交换机名称,名称的规则:什么类型.交换者A.交换者B
                EXCHANGE_NAME,
                // 交换机类型,这里选择直接交换
                BuiltinExchangeType.DIRECT,
                // 持久,如果为 false,重启后交换机就没了
                true,
                // 自动删除,如果为 true,没有队列依赖此交换机,交换机就没了
                false,
                // 参数
                null
        );

        // 声明队列
        channel.queueDeclare(
                // 队列名称,名称的规则:什么类型.干什么的
                QUEUE_NAME,
                // 持久,如果为 false,重启后队列就没了
                true,
                // 是否为当前 connection 独占,如果为 true,那其他 connection 就无法连接
                false,
                // 自动删除,如果为 true,队列中没有消息之后,队列就没了
                false,
                // 参数
                null
        );

        // 绑定 Exchange 和 Queue
        channel.queueBind(
                QUEUE_NAME,
                EXCHANGE_NAME,
                KEY_NAME
        );

        // 消费者监听
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {});
    }

    @AfterEach
    void destroy() throws Exception {
        channel.close();
        connection.close();
    }

    /**
     * 消息确认机制
     */
    @Test
    void publisherConfirm() throws Exception {
        // 开启发送端确认机制
        channel.confirmSelect();

        channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, "一只英短猫来了".getBytes(StandardCharsets.UTF_8));

        // 确认消息是否送达 Exchange
        if (channel.waitForConfirms()) {
            log.info("【生产者发送消息】- 成功了");
        } else {
            log.error("【生产者发送消息】- 失败了");
        }
    }

}

同步多条消息确认机制

  1. 开启发送端消息确认机制:channel.confirmSelect()
  2. 发送多次消息。
  3. 调用 channel.waitForConfirms() 进行确认。

这种方式存在一些缺点,如果最终确认为 true 还没啥问题,但是如果为 false,那就无法确定这多个消息中是哪个没成功。

/**
     * 消息确认机制
     */
    @Test
    void publisherConfirm() throws Exception {
        // 开启发送端确认机制
        channel.confirmSelect();

        for (int i = 0; i < 10; i++) {
            String s = "一只英短猫来了" + i;
            channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, s.getBytes(StandardCharsets.UTF_8));
        }

        // 确认消息是否送达 Exchange
        if (channel.waitForConfirms()) {
            log.info("【生产者发送消息】- 成功了");
        } else {
            log.error("【生产者发送消息】- 失败了");
        }
    }

异步消息确认机制

  1. 开启发送端消息确认机制:channel.confirmSelect()
  2. channel.addConfirmListener() 添加确认监听器。
  3. 发送多次消息。

异步消息确认机制同时支持单条和多条,这取决于处理消息的速度。

/**
 * 消息确认机制
 */
@Test
void publisherConfirm() throws Exception {
  // 开启发送端确认机制
  channel.confirmSelect();
  // 发送端监听
  ConfirmListener confirmListener = new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      log.info("Ack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
      log.info("Nack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
    }
  };
  channel.addConfirmListener(confirmListener);

  for (int i = 0; i < 10; i++) {
    String s = "一只英短猫来了" + i;
    channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, s.getBytes(StandardCharsets.UTF_8));
  }

  TimeUnit.SECONDS.sleep(3);
}

show:

16:03:21.948 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $3$, multiple: $true$
16:03:21.952 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $6$, multiple: $true$
16:03:21.952 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $7$, multiple: $false$
16:03:21.952 [AMQP Connection 127.0.0.1:5672] INFO com.lynchj.rabbitmq.main.PublisherConfirmTest - Ack deliveryTag: $10$, multiple: $true$
  • deliveryTag: 就是 Channel 发送消息时生成的一个序号,多个 Channel 之间会重复。deliverTag 代表已经接收到的条数及之前的条数,比如是 4,代表 1、2、3、4 都接收了。
  • multiple: 一次确认多条为 true,单条为 false。

这种处理方式也不是很好,与 同步多条消息确认机制 存在同样的问题,不过好在有个 deliveryTag,可以通过数据库中转的方式确认是哪些消息成功,哪些消息失败。

消息返回机制(问题2)

RabbitMQ 提供了消息返回机制来解决 Exchange -> Queue 没有路由的问题,如果没有能路由到 Queue,将会通知到发送方,发送方处理此问题。

在 channel.basicPublish() 的时候有一个重载方法,其中有一个参数 mandatory。

  • mandatory: 为 true 代表消息从 Exchange 无法路由到 Queue 时,将通知发送方。为 false 则表示直接丢弃。
/**
 * 消息返回机制
 */
@Test
void publisherReturn() throws Exception {
  // 开启发送端确认机制
  channel.confirmSelect();
  // 发送端监听
  ConfirmListener confirmListener = new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
      log.info("Ack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
      log.info("Nack deliveryTag: ${}$, multiple: ${}$", deliveryTag, multiple);
    }
  };
  channel.addConfirmListener(confirmListener);

  // 消息返回机制监听
  channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
    // 如果进入此方法,说明有消息从 Exchange 路由到 Queue 失败了
    log.error("【路由失败了】- replyCode: ${}$, replyText: ${}$, exchange: ${}$, routingKey: ${}$, properties: ${}$, body: ${}$",
              replyCode, replyText, exchange, routingKey, properties, new String(body, StandardCharsets.UTF_8));
  });
  /*channel.addReturnListener(returnMessage -> {
    // 与上边的用法一致,只是都包装到 returnMessage 中了。
  });*/

  // 用一个没有的 Routing Key
  // mandatory: false
  channel.basicPublish(EXCHANGE_NAME, KEY_NAME + 1, null, "一只英短猫来了1".getBytes(StandardCharsets.UTF_8));
  // mandatory: true
  channel.basicPublish(EXCHANGE_NAME, KEY_NAME + 1, true, null, "一只英短猫来了2".getBytes(StandardCharsets.UTF_8));

  TimeUnit.SECONDS.sleep(3);
}

show:

18:57:39.652 [AMQP Connection 127.0.0.1:5672] ERROR com.lynchj.rabbitmq.main.PublisherReturnTest - 【路由失败了】- replyCode: $312$, replyText: $NO_ROUTE$, exchange: $exchange.cat.dog$, routingKey: $key.yingduan1$, properties: $#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)$, body: $一只英短猫来了2$
  • replyCode: 返回状态码,类似 HTTP 状态码。
  • replyText: 返回的原因。
  • exchange: 交换器。
  • routingKey: 路由 Key。
  • properties: 配置。
  • body: 消息体。

消费端确认机制(问题3)

问题三是情况下消费者端在订阅时开启了自动 Ack。自动的情况下,如果消费者端出错,消息就没了。所以改成手动 Ack,自主控制处理。

  1. 消费者在使用 channel.basicConsume() 订阅队列是,参数 autoAck 设置为 false。
  2. 在消费消息的时候手动 channel.basicAck() 掉。
@Slf4j
public class ConsumerConfirmTest {

    private static final String EXCHANGE_NAME = "exchange.cat.dog";
    private static final String QUEUE_NAME    = "queue.cat";
    private static final String KEY_NAME      = "key.yingduan";

    private static Connection connection;
    private static Channel    channel;

    /**
     * 监听队列回调函数
     */
    DeliverCallback deliverCallback = (consumerTag, message) -> {
        log.info("【监听队列回调函数】- 入参,message: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));

        // 手动 Ack
        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        // 手动 Nack
        // requeue: 是否重新入队,如果此属性为 true,消息会被重新放置回去对应队列(如果可能的话,会放回到原来的位置),如果此属性为 false,消息直接被丢弃。
        //          属性 requeue 如果设置为true,需要谨慎设计程序的逻辑,否则很有可能导致消息一直重复消费失败并且重复重新入队,表现为消费者线程出现死循环逻辑,耗尽服务器CPU资源。
        //          因为第一次消费时既然 Nack 了,后续大概也不能成功,所以一般情况下选择不重新入队,而是通过其他方式处理。
//        channel.basicNack(message.getEnvelope().getDeliveryTag(), false, false);

        // 手动 Reject
        // basicNack 算是 basicReject 的一个扩展,因为 basicReject 不能一次拒绝多条消息。
//        channel.basicReject(message.getEnvelope().getDeliveryTag(), false);

        // 对于 basicNack 和 basicReject ,如果参数 requeue 传入 false,消息还是会从队列里面删除。
    };

    @BeforeEach
    void init() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");

        connection = connectionFactory.newConnection();
        channel = connection.createChannel();

        // 声明交换机 Exchange
        channel.exchangeDeclare(
                // 交换机名称,名称的规则:什么类型.交换者A.交换者B
                EXCHANGE_NAME,
                // 交换机类型,这里选择直接交换
                BuiltinExchangeType.DIRECT,
                // 持久,如果为 false,重启后交换机就没了
                true,
                // 自动删除,如果为 true,没有队列依赖此交换机,交换机就没了
                false,
                // 参数
                null
        );

        // 声明队列
        channel.queueDeclare(
                // 队列名称,名称的规则:什么类型.干什么的
                QUEUE_NAME,
                // 持久,如果为 false,重启后队列就没了
                true,
                // 是否为当前 connection 独占,如果为 true,那其他 connection 就无法连接
                false,
                // 自动删除,如果为 true,队列中没有消息之后,队列就没了
                false,
                // 参数
                null
        );

        // 绑定 Exchange 和 Queue
        channel.queueBind(
                QUEUE_NAME,
                EXCHANGE_NAME,
                KEY_NAME
        );

        // 消费者监听,关闭自动 ACK
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, (consumerTag) -> {
        });
    }

    @AfterEach
    void destroy() throws Exception {
        channel.close();
        connection.close();
    }

    /**
     * 消息确认机制
     */
    @Test
    void publisher() throws Exception {
        channel.basicPublish(EXCHANGE_NAME, KEY_NAME, null, "一只英短猫来了".getBytes(StandardCharsets.UTF_8));

        TimeUnit.SECONDS.sleep(3);
    }

}

总结

通过实施这三个问题对应的方案,可以有效解决 发送方 -> Exchange、Exchange -> Queue、Queue -> 消费方 中间可能出现的不稳定因素,避免丢失消息。

标签: Java RabbitMQ
最后更新:2022年7月6日

大漠知秋

唯黄昏而思烛明,唯覆雪始念日暖,唯放手方知情真,今困苦而怀峥嵘,今飘零而涌乡愁,今孑然而徒唏嘘,唏嘘成愁。

点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

文章目录
  • 简介
  • 问题
  • 发送端确认机制(问题1)
    • 同步单条消息确认机制(推荐)
    • 同步多条消息确认机制
    • 异步消息确认机制
  • 消息返回机制(问题2)
  • 消费端确认机制(问题3)
  • 总结

COPYRIGHT © 2023 大漠知秋的加油站. ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

豫ICP备16029200号-2