SpringBoot 的支持
SpringBoot 已经提供了对 AMQP 协议完全支持的 spring-boot-starter-amqp
依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。参考:Spring AMQP。
特点
- 用于异步处理消费消息的监听器容器。
- 用于发送和接收消息的 RabbitTemplate。
- RabbitAdmin 用于自动声明队列、交换和绑定。
RabbitAdmin
作用
- declareExchange:创建交换机。
- deleteExchange:删除交换机。
- declareQueue:创建队列。
- deleteQueue:删除队列。
- purge:清空队列。
- declareBinding:新建绑定关系。
- removeBinding:删除绑定关系。
- getQueueProperties:查询队列属性。
加粗的为常用。
创建方式
// 连接工厂
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
// 使用连接工厂创建 RabbitAdmin
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
编程式实现
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "exchange.cat.dog";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_NAME = "queue.cat";
public static final String QUEUE_DLX = "queue.dlx";
public static final String KEY_NAME = "key.yingduan";
public static final String KEY_DLX = "#";
@PostConstruct
void rabbitAdmin() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
// 创建 RabbitAdmin
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 声明交换机
DirectExchange directExchange = new DirectExchange(EXCHANGE_NAME);
rabbitAdmin.declareExchange(directExchange);
// 声明队列
Queue queue = new Queue(QUEUE_NAME);
rabbitAdmin.declareQueue(queue);
// 声明绑定关系
// 目的地名称、目的地类型、绑定交换机、绑定 key、参数
Binding binding = new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
rabbitAdmin.declareBinding(binding);
}
}
声明式实现(推荐)
@Slf4j
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "exchange.cat.dog";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_NAME = "queue.cat";
public static final String QUEUE_DLX = "queue.dlx";
public static final String KEY_NAME = "key.yingduan";
public static final String KEY_DLX = "#";
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
return connectionFactory;
}
@Bean
RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
Exchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Queue queue() {
return new Queue(QUEUE_NAME);
}
@Bean
Binding binding() {
// 目的地名称、目的地类型、绑定交换机、绑定 key、参数
return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
}
}
注意,以上配置再启动 SpringBoot 并不会立马创建交换机、队列、绑定,SpringBoot AMQP 有懒加载,需要等到使用 connection
时才会创建。什么是使用 connection
呢?
- 比如创建
connection
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
connectionFactory.createConnection();
return connectionFactory;
}
- 再比如监听了队列
@RabbitListener(queues = {"test"})
void test() {
log.info("【测试监听消息】");
}
死信队列机制
死信队列需要在创建 Queue 时指定对应属性:
@Bean
Queue queue() {
// 配置声明队列时使用的参数
Map<String, Object> args = new HashMap<>(1);
// 设置死信队列指向的交换机
args.put("x-dead-letter-exchange", EXCHANGE_DLX);
return new Queue(QUEUE_NAME, true, false, false, args);
}
RabbitTemplate
RabbitTemplate
是 SpringBoot AMQP 提供的快速发 RabbitMQ 消息的模板类,与 RestTemplate 有类似之处,意指方便、简单、快速的发 RabbitMQ 消息。
创建
@Bean
RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
发送消息
// 通过 Spring 到处注入使用即可。
rabbitTemplate.send(EXCHANGE_NAME, KEY_NAME, new Message("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)))
rabbitTemplate.convertAndSend(EXCHANGE_NAME, KEY_NAME, "HelloWorld 中国");
Message message = rabbitTemplate.sendAndReceive(RabbitConfig.EXCHANGE_NAME, RabbitConfig.KEY_NAME, new Message("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)));
send(final String exchange, final String routingKey, final Message message)
(常用)
普通的消息发送,Message
的带参构造中可以传递参数,比如消息过期时间。
convertAndSend(String exchange, String routingKey, final Object object)(常用)
可以转换 Java 对象成 AMQP 消息进行发送。
Message sendAndReceive(final String exchange, final String routingKey, final Message message)
阻塞等待 5 秒钟,返回的 Message 就是服务端返回的数据,阻塞时间可以使用 rabbitTemplate.setReplyTimeout(10000)
设置。
发送端确认机制 和 消息返回机制
之前的《RabbitMQ 消息百分百投递方案》中有详细的记录过非 SpringBoot 的发送端确认机制
和 消息返回机制
。那改成 SpringBoot AMQP 之后肯定也是支持的。之前推荐使用同步单条消息确认机制
,可以准确知道是哪一条消息出现问题方便做处理。同步多条
和 异步
都不好确定是具体哪一条出现问题。
SpringBoot AMQP 提供的需要先配置 connectionFactory
:
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
// 发送端确认的类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
// 开启消息返回机制
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
在 RibbitTemplate
中配置回调函数:
@Bean
RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启 Mandatory
rabbitTemplate.setMandatory(true);
// 配置 发送端确认 回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
});
// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
rabbitTemplate.setReturnsCallback(returned -> {
log.info("【消息返回】- 入参; returned: ${}$", returned);
});
return rabbitTemplate;
}
show:
2022-07-05 13:24:16.549 INFO 59768 --- [nectionFactory1] com.lynchj.rabbitmq.config.RabbitConfig : 【消息返回】- 入参; returned: $ReturnedMessage [message=(Body:'[B@3589027c(byte[17])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=exchange.cat.dog, routingKey=key.yingduan]$
2022-07-05 13:24:16.550 INFO 59768 --- [nectionFactory2] com.lynchj.rabbitmq.config.RabbitConfig : 【发送端确认】- 入参; correlationData: $null$, ack: $true$, cause: $null$
ConfirmType.CORRELATED
上面的配置,在发送端确认时是无法区分消息是哪一个的,观察日志也能看出来,就打印了一个 ack 的值。要想关联上对应的消息需要做如下配置:
// 发送端确认的类型从 SIMPLE 更改为 CORRELATED
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
CORRELATED:指有关联性的。
在发送消息是修改如下:
// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.send(
RabbitConfig.EXCHANGE_NAME,
RabbitConfig.KEY_NAME,
new Message("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)),
correlationData
);
show:
2022-07-05 13:30:09.779 INFO 54416 --- [nectionFactory1] com.lynchj.rabbitmq.config.RabbitConfig : 【发送端确认】- 入参; correlationData: $CorrelationData [id=976c94a6-2fa8-45dd-84e1-691c0db31460]$, ack: $true$, cause: $null$
SimpleMessageListenerContainer
SimpleMessageListenerContainer
可以帮助在开发中高效的监听消息,可以设置坚挺队列、设置消费者数量、重回队列、消息确认模式等等。主要功能如下:
- 设置同时监听多个队列、自动启动、自动配置RabbitMQ。
- 设置消费者数量(最大数量、最小数量、批量消费)。
- 设置消息确认模式、是否重回队列、异常捕获。
- 设置是否独占、其他消费者属性等。
- 设置具体的监听器、消息转换器等。
- 支持动态设置,运行中修改监听器配置。
代码实现
@Bean
SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 监听队列,可多个
messageListenerContainer.setQueueNames(QUEUE_NAME);
// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
messageListenerContainer.setConcurrentConsumers(1);
// 并发处理的线程最大数目,不能小于 concurrentConsumers
messageListenerContainer.setMaxConcurrentConsumers(1);
// Ack 的方式
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费端限流
messageListenerContainer.setPrefetchCount(1);
// 设置监听消息处理方法
messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
log.info("【消费消息】- 入参;message: ${}$", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
});
return messageListenerContainer;
}
MessageListenerAdapter
上边实现的消费者监听是通过 messageListenerContainer.setMessageListener()
方法实现,业务代码写到了配置的代码中,耦合性比较强,更优雅一点的做法是使用 MessageListenerAdapter
。
@Bean
SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 监听队列,可多个
messageListenerContainer.setQueueNames(QUEUE_NAME);
// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
messageListenerContainer.setConcurrentConsumers(1);
// 并发处理的线程最大数目,不能小于 concurrentConsumers
messageListenerContainer.setMaxConcurrentConsumers(1);
// Ack 的方式
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费端限流
messageListenerContainer.setPrefetchCount(1);
// 设置监听消息处理方法
/*messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("【消费消息】- 入参;message: ${}$", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
});*/
// 创建消息监听适配器
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(handleMessage);
// 设置处理消息的适配器
messageListenerContainer.setMessageListener(messageListenerAdapter);
return messageListenerContainer;
}
handleMessage
是注入的另一个类:
@Slf4j
@Component
public class HandleMessage {
void handleMessage(byte[] message) throws IOException {
log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
}
}
经过包装一层 MessageListenerAdapter
适配器,可以把真实的处理方法解耦出去,MessageListenerAdapter
的 setDelegate()
方法设置了任意一个 Object,等到有消息消费时,会调用到这个 Object 的 handleMessage
方法,这个方法名是 MessageListenerAdapter
内部的一个常量:
也可以通过调用 MessageListenerAdapter
的 setDefaultListenerMethod()
方法来更改默认调用方法名。
还可以配置监听多个队列,并给不同的队列设置不同的处理方法:
// 监听多个队列
messageListenerContainer.setQueueNames("cat", "dog", "queue.dog.cat");
// 创建消息监听适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(handleMessage);
// 设置真实处理业务消息的默认方法名称,如果没有设置,那么默认的处理器中的默认方式是 handleMessage 方法
adapter.setDefaultListenerMethod("onMessage");
// 配置队列与真实处理业务消息的方法对应名称
Map<String, String> queueOrTagToMethodName = new HashMap<>(8);
queueOrTagToMethodName.put("cat", "onCat");
queueOrTagToMethodName.put("dog", "onDog");
queueOrTagToMethodName.put("queue.dog.cat", "onInfo");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
// 设置处理消息的适配器
messageListenerContainer.setMessageListener(adapter);
@Slf4j
@Component
public class HandleMessage {
void handleMessage(byte[] message) {
log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
}
void onCat(byte[] message) {
log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
}
void onDog(byte[] message) {
log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
}
void onInfo(byte[] message) {
log.info("【消费消息】- 入参;message: ${}$", new String(message, StandardCharsets.UTF_8));
}
}
注意:美中不足的是
MessageListenerAdapter
中适配的真实处理业务消息的方法入参只能是byte[]
。
MessageConverter
先说说其作用,之前收发消息时,使用了 Byte[] 数组作为消息体,而在编写业务逻辑时,需要使用 Java 对象,这样就避免不了要来回从 Byte[] <> String <> Java 对象之间的相互转换。MessageConverter 就是用来在收发消息时自动转换 AMQP 内部消息和 Java 对象的。
MessageConverter
本身是接口,无法直接使用,不过 AMQP 内已经提供了一个其实现 org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
方便直接使用,一般其况下使用这个就足够了,因为项目中大部分应该都是 JSON 形式的数据。当然,如果出现一些比较少见的格式,也可以自定义,只需要重写 toMessage
和 fromMessage
即可。
Jackson2JsonMessageConverter
Student
Model:
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Student {
private String name;
private Integer age;
}
发消息的方法:
void send() throws Exception {
// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
Student student = new Student("大漠知秋", 18);
ObjectMapper objectMapper = new ObjectMapper();
String s = objectMapper.writeValueAsString(student);
rabbitTemplate.send(
RabbitConfig.EXCHANGE_NAME,
RabbitConfig.KEY_NAME,
new Message(s.getBytes(StandardCharsets.UTF_8)),
correlationData
);
}
消费者监听配置:
@Bean
SimpleMessageListenerContainer messageListenerContainer(@Autowired ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 监听队列,可多个
messageListenerContainer.setQueueNames(QUEUE_NAME);
// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
messageListenerContainer.setConcurrentConsumers(1);
// 并发处理的线程最大数目,不能小于 concurrentConsumers
messageListenerContainer.setMaxConcurrentConsumers(1);
// Ack 的方式
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 消费端限流
messageListenerContainer.setPrefetchCount(1);
// 创建消息监听适配器
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setDelegate(handleMessage);
// 配置 MessageConverter
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
messageListenerAdapter.setMessageConverter(messageConverter);
// 设置处理消息的适配器
messageListenerContainer.setMessageListener(messageListenerAdapter);
return messageListenerContainer;
}
HandleMessage
:
@Slf4j
@Component
public class HandleMessage {
void handleMessage(Student student) {
log.info("【消费消息】- 入参;message: ${}$", student);
}
}
如果仅仅是这样配置上 MessageConverter
就启动的话会报如下错:
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to invoke target method 'handleMessage' with argument type = [class java.util.LinkedHashMap], value = [{{name=大漠知秋, age=18}}]
Jackson2JsonMessageConverter
默认转换的 Java 对象为 LinkedHashMap,而在 handleMessage
处理方法中的参数是 Student
,所以就报错了。需要指定一下类型:
// 配置 MessageConverter
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
messageConverter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
}
@Override
public Class<?> toClass(MessageProperties properties) {
return Student.class;
}
});
messageListenerAdapter.setMessageConverter(messageConverter);
show:
2022-07-05 17:19:22.724 INFO 34140 --- [enerContainer-1] c.lynchj.rabbitmq.handle.HandleMessage : 【消费消息】- 入参;message: $Student(name=大漠知秋, age=18)$
@RabbitListener(终极监听方案)
使用此方案做监听消息功能,就可以把之前的 SimpleMessageListenerContainer
进行监听的方案舍弃掉了,就是这么的喜新厌旧,不过之前的 SimpleMessageListenerContainer 也不是一无是处,学过之后可以更好的理解内部的一些逻辑。
@RabbitListener
的特点:
- RabbitListener 是 SpringBoot 架构中监听消息的
终极方案
。 - RabbitListener 使用注解声明,对业务代码无侵入。
- RabbitListener 可以在 SpringBoot 配置文件中进行配置。
@RabbitListener
本身是 Java 中的注解,可以搭配其他注解一起使用:
- @Exchange:自动声明 Exchange。
- @Queue:自动声明队列。
- @QueueBinding:自动声明绑定关系。
基本使用
首先在 RabbitConfig
中新增创建 RabbitListenerContainerFactory
的 Bean,看名字应该就知道是用来替换掉 SimpleMessageListenerContainer
的工厂。方便后边使用 @RabbitListener
时创建 ListenerContainer
。
@Slf4j
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "exchange.cat.dog";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_NAME = "queue.cat";
public static final String QUEUE_DLX = "queue.dlx";
public static final String KEY_NAME = "key.yingduan";
public static final String KEY_DLX = "#";
public static final String RABBIT_ADMIN = "rabbitAdmin";
public static final String RABBIT_LISTENER_CONTAINER_FACTORY = "rabbitListenerContainerFactory";
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
// 发送端确认的类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启消息返回机制
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name = RABBIT_ADMIN)
RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
Exchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Queue queue() {
// 配置声明队列时使用的参数
Map<String, Object> args = new HashMap<>(1);
// 设置死信队列指向的交换机
args.put("x-dead-letter-exchange", EXCHANGE_DLX);
return new Queue(QUEUE_NAME, true, false, false, args);
}
@Bean
Binding binding() {
// 目的地名称、目的地类型、绑定交换机、绑定 key、参数
return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
}
@Bean
RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启 Mandatory
rabbitTemplate.setMandatory(true);
// 配置 发送端确认 回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
});
// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
rabbitTemplate.setReturnsCallback(returned -> {
log.info("【消息返回】- 入参; returned: ${}$", returned);
});
return rabbitTemplate;
}
@Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)
RabbitListenerContainerFactory rabbitListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory);
return listenerContainerFactory;
}
}
发送端:
@Slf4j
@Component
public class PublisherConfirm {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
void send() throws Exception {
// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
Student student = new Student("大漠知秋", 18);
ObjectMapper objectMapper = new ObjectMapper();
String s = objectMapper.writeValueAsString(student);
rabbitTemplate.send(
RabbitConfig.EXCHANGE_NAME,
RabbitConfig.KEY_NAME,
new Message(s.getBytes(StandardCharsets.UTF_8)),
correlationData
);
}
}
消费者端:
@Slf4j
@Component
public class RabbitListenerTest {
@RabbitListener(
containerFactory = RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,
queues = {RabbitConfig.QUEUE_NAME}
)
void listenCat(@Payload Message message) {
log.info("【消费消息】- 入参;message: ${}$", message);
log.info("【消费消息】- student: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
show:
2022-07-05 18:03:03.101 INFO 60560 --- [ntContainer#0-1] c.l.rabbitmq.handle.RabbitListenerTest : 【消费消息】- 入参;message: $(Body:'[B@251d4c4b(byte[32])' MessageProperties [headers={spring_listener_return_correlation=0e26e018-0e0a-43af-a197-67428c8fc800, spring_returned_message_correlation=e8937d7d-1257-46a2-93ec-e65bd2fac5ad}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.cat.dog, receivedRoutingKey=key.yingduan, deliveryTag=1, consumerTag=amq.ctag-A8eb9Qb1Uwyrdz6KcsvwBA, consumerQueue=queue.cat])$
2022-07-05 18:03:03.101 INFO 60560 --- [ntContainer#0-1] c.l.rabbitmq.handle.RabbitListenerTest : 【消费消息】- student: ${"name":"大漠知秋","age":18}$
使用 bindings 创建 Exchange、Queue、Binding
简化后的 RabbitConfig
:
@Slf4j
@Configuration
public class RabbitConfig {
public static final String EXCHANGE_NAME = "exchange.cat.dog";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_NAME = "queue.cat";
public static final String QUEUE_DLX = "queue.dlx";
public static final String KEY_NAME = "key.yingduan";
public static final String KEY_DLX = "#";
public static final String RABBIT_ADMIN = "rabbitAdmin";
public static final String RABBIT_LISTENER_CONTAINER_FACTORY = "rabbitListenerContainerFactory";
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
// 发送端确认的类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
// 开启消息返回机制
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name = RABBIT_ADMIN)
RabbitAdmin rabbitAdmin(@Autowired ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
RabbitTemplate rabbitTemplate(@Autowired ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 开启 Mandatory
rabbitTemplate.setMandatory(true);
// 配置 发送端确认 回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
});
// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
rabbitTemplate.setReturnsCallback(returned -> {
log.info("【消息返回】- 入参; returned: ${}$", returned);
});
return rabbitTemplate;
}
@Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)
RabbitListenerContainerFactory rabbitListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory listenerContainerFactory = new SimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory);
return listenerContainerFactory;
}
}
RabbitListenerTest
:
@Slf4j
@Component
public class RabbitListenerTest {
@RabbitListener(
containerFactory = RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,
// 指定 RabbitAdmin,创建 Exchange、Queue、Binding 时使用
admin = RabbitConfig.RABBIT_ADMIN,
// 绑定关系,没有的 Exchange、Queue、Binding 没有的会自动创建。
bindings = {
// 第一个绑定关系,可以多个
@QueueBinding(
// 队列
value = @Queue(
// 队列名
name = RabbitConfig.QUEUE_NAME,
// 队列参数
arguments = {
// 队列中消息超时时间
@Argument(
name = "x-message-ttl",
value = "1000",
type = "java.lang.Integer"
),
// 死信队列配置信息
@Argument(
name = "x-dead-letter-exchange",
value = RabbitConfig.EXCHANGE_DLX,
// 默认值就是 String, 也可以不写
type = "java.lang.String"
)
}
),
// 交换机
exchange = @Exchange(
// 交换机名
name = RabbitConfig.EXCHANGE_NAME
),
// 绑定 Key
key = {RabbitConfig.KEY_NAME}
)
}
)
void listenCat(@Payload Message message) {
log.info("【消费消息】- 入参;message: ${}$", message);
log.info("【消费消息】- student: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
这种方式注解写到崩溃,不建议使用。。。
SpringBoot 使用 RabbitMQ 终极方案
SpringBoot 的开发原则就是约定大于配置
,上面的代码中,还存在着不少 @Bean
的配置代码,这显然很不 SpringBoot,应该把一些常规配置,配置到 .yml
或 .properties
中,让项目可以从配置文件中自动加载好 Bean,对此,SpringBoot AMQP 包提供了对应的支持。
配置文件(.yml/.properties)
spring:
rabbitmq:
host: 'localhost'
port: 5672
username: 'admin'
password: 'kzh_mxg4vfb2QRP*xkv'
virtual-host: '/'
# 发送端确认机制开启,并且使用关联性的类型
publisher-confirm-type: correlated
# 开启消息返回机制
publisher-returns: true
template:
# 开启委托,配合 publisher-returns 使用
mandatory: true
listener:
simple:
# Ack 模式
acknowledge-mode: manual
# 消费者限流
prefetch: 10
# 并发处理的线程最小数目,不能大于 max-concurrency
concurrency: 3
# 并发处理的线程最大数目,不能小于 concurrency
max-concurrency: 5
RabbitConfig
主要是配置发送端确认回调
、消息返回回调
、Exchange
、Queue
、Binding
的创建。
@Slf4j
@Configuration
public class RabbitConfig implements InitializingBean {
public static final String EXCHANGE_NAME = "exchange.cat.dog";
public static final String EXCHANGE_DLX = "exchange.dlx";
public static final String QUEUE_NAME = "queue.cat";
public static final String QUEUE_DLX = "queue.dlx";
public static final String KEY_NAME = "key.yingduan";
public static final String KEY_DLX = "#";
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void afterPropertiesSet() throws Exception {
// 发送端确认 回调配置
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);
});
// 消息返回 回调配置,只有在 Exchange 无法路由到 Queue 时回调
rabbitTemplate.setReturnsCallback(returned -> {
log.error("【消息返回】- 入参; returned: ${}$", returned);
});
}
@Bean
Exchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
Queue queue() {
// 配置声明队列时使用的参数
Map<String, Object> args = new HashMap<>(1);
// 设置死信队列指向的交换机
args.put("x-dead-letter-exchange", EXCHANGE_DLX);
// 设置队列内消息过期时间,单位:毫秒
args.put("x-message-ttl", 15000);
return new Queue(QUEUE_NAME, true, false, false, args);
}
@Bean
Binding binding() {
// 目的地名称、目的地类型、绑定交换机、绑定 key、参数
return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME, null);
}
@Bean
TopicExchange dlxExchange() {
return new TopicExchange(EXCHANGE_DLX);
}
@Bean
Queue dlxQueue() {
return new Queue(QUEUE_DLX);
}
@Bean
Binding dlxBinding() {
// 目的地名称、目的地类型、绑定交换机、绑定 key、参数
return new Binding(QUEUE_DLX, Binding.DestinationType.QUEUE, EXCHANGE_DLX, KEY_DLX, null);
}
}
发送消息
@Slf4j
@RestController
@RequestMapping("/send")
public class SendController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendOne/{name}")
void sendOne(@PathVariable(name = "name") String name) throws JsonProcessingException {
log.info("【sendOne】- 入参: ${}$", name);
// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
Student student = new Student(name, 18);
ObjectMapper objectMapper = new ObjectMapper();
String s = objectMapper.writeValueAsString(student);
MessageProperties messageProperties = new MessageProperties();
// 消息过期时间 10 秒
messageProperties.setExpiration("10000");
rabbitTemplate.send(
RabbitConfig.EXCHANGE_NAME,
RabbitConfig.KEY_NAME,
new Message(s.getBytes(StandardCharsets.UTF_8), messageProperties),
correlationData
);
}
}
监听消息
@Slf4j
@Component
public class RabbitListeners {
@RabbitListener(queues = {RabbitConfig.QUEUE_NAME})
void listenCat(String content, @Payload Message message, Channel channel) throws IOException, InterruptedException {
log.info("【消费消息】- 入参;content: ${}$, message: ${}$, channel: ${}$", content, message, channel);
log.info("【消费消息】- student: ${}$", new String(message.getBody(), StandardCharsets.UTF_8));
TimeUnit.SECONDS.sleep(3);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
Student
@NoArgsConstructor
@AllArgsConstructor
@Data
public class Student {
private String name;
private Integer age;
}
说明
这一套下来包括了发送端确认
、消息返回
、手动 Ack
、消费者限流
、消息过期
、死信队列
。可以有效地保证消息的发送、路由、消费能够正常执行。
文章评论