11.1代码实现
11.1.1 队列配置
这里以direct类型的交换机作为演示定义了一个交换机、队列和路由Key。交换机和队列进行绑定,这里需要对队列进行设置,构建队列的时候需要设置参数x-message-ttl来设置这个TTL队列的失效时间,也可以针对某一条具体的消息来设置对应的TTL失效的时间。
/**
* rabbitmq 配置类
*/
@Configuration
public class RabbitMQConfig {
/**
* TTL消息队列
**/
public static final String TTL_EXCHANGE_NAME = "TTL_DIRECT_BOOT_EXCHANGE";
public static final String TTL_QUEUE_NAME = "TTL_DIRECT_BOOT_QUEUE";
public static final String TTL_ROUTEING_KEY_NAME = "TTL_DIRECT_BOOT";
/**
* 定向模式
*
* @return
*/
@Bean(name = "ttlBootExchange")
public Exchange ttlBootExchange() {
return ExchangeBuilder.directExchange(TTL_EXCHANGE_NAME).durable(true).build();
}
@Bean(name = "ttlBootQueue")
public Queue ttlBootQueue() {
Map<String, Object> map = new HashMap<String, Object>();
// 设置ttl失效时间为30秒
map.put("x-message-ttl", 3 * 10000);
return QueueBuilder.durable(TTL_QUEUE_NAME).withArguments(map).build();
}
/**
* 绑定消息队列和交换机
*/
@Bean
public Binding ttlBootBindingExchangeAndQueue(@Qualifier("ttlBootExchange") Exchange exchange, @Qualifier("ttlBootQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(TTL_ROUTEING_KEY_NAME).noargs();
}
}
11.1.2 生产者
11.1.2.1 队列是具有TTL失效时间
这个就是整个队列统一的TTL过期时间,是通过 x-message-ttl 的参数并设置过期时间
/**
* 测试TTL过期时间分为两种,队列过期和消息过期
* 队列统一过期:在声明队列的时候在参数里面加上 x-message-ttl 的参数并设置过期时间 map.put("x-message-ttl", 3 * 10000);
* 消息过期: 需要声明一个匿名内部类 MessagePostProcessor messagePostProcessor
* 同时设置队列统一过期和消息过期,以时间短的先处理
* 只有ttl过期的消息在队列顶端才会被队列移除掉
*/
@Test
public void sender_test_ttl() {
// 1、第一步需要在 ConnectionFactory 打开运行回退模式开关也就是 配置 publisher-returns
// spring.rabbitmq.publisher-returns = true
// 2、第二步是rabbitTemplate定义ConfirmCallBack回调函数
// 3、设置一个exchange来处理消息的模式,两种模式
// 3.1、如果消息没有路由到Queue就默认丢弃消息,默认模式
// 3.2、如果消息没有路由到Queue就返回给消息的发送方ReturnCallback
// 默认是setMandatory(false),需要将消息返回给发送方就设置成true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 发送方的消息确认模式作用于生产者和exchange之间
* @param message 消息对象
* @param replyCode 返回的失败的状态码
* @param replyText 返回的失败的信息
* @param exchange 交换机
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("returnedMessage 被执行了....");
}
});
// 4、发送消息进行测试(这是队列统一过期时间的方式)
rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME, RabbitMQConfig.TTL_ROUTEING_KEY_NAME, "spring test rabbitmq ttl ....");
}
11.1.2.2 具体的消息有TTL失效时间
如果是想要的是针对某一条消息来说的设置的TTL过期时间则在声明队列的时候就是声明的一个普通队列不需要设置任何参数,也就是去掉 x-message-ttl 参数的设置。单独的对消息进行设置则是需要声明一个匿名内部类 MessagePostProcessor对message进行设置时间并返回一个Message消息。
/**
* 测试TTL过期时间分为两种,队列过期和消息过期
* 队列统一过期:在声明队列的时候在参数里面加上 x-message-ttl 的参数并设置过期时间 map.put("x-message-ttl", 3 * 10000);
* 消息过期: 需要声明一个匿名内部类 MessagePostProcessor messagePostProcessor
* 同时设置队列统一过期和消息过期,以时间短的先处理
* 只有ttl过期的消息在队列顶端才会被队列移除掉
*/
@Test
public void sender_test_ttl() {
// 1、第一步需要在 ConnectionFactory 打开运行回退模式开关也就是 配置 publisher-returns
// spring.rabbitmq.publisher-returns = true
// 2、第二步是rabbitTemplate定义ConfirmCallBack回调函数
// 3、设置一个exchange来处理消息的模式,两种模式
// 3.1、如果消息没有路由到Queue就默认丢弃消息,默认模式
// 3.2、如果消息没有路由到Queue就返回给消息的发送方ReturnCallback
// 默认是setMandatory(false),需要将消息返回给发送方就设置成true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 发送方的消息确认模式作用于生产者和exchange之间
* @param message 消息对象
* @param replyCode 返回的失败的状态码
* @param replyText 返回的失败的信息
* @param exchange 交换机
* @param routingKey 路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("returnedMessage 被执行了....");
}
});
// 4、这是消息单独设置过期时间的方式,需要声明一个匿名内部类 MessagePostProcessor messagePostProcessor
rabbitTemplate.convertAndSend(RabbitMQConfig.TTL_EXCHANGE_NAME, RabbitMQConfig.TTL_ROUTEING_KEY_NAME, "spring test rabbitmq ttl ....", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 1、设置message的信息
message.getMessageProperties().setExpiration("30000"); // 设置消息的·自动过期时间
// 2、返回该消息
return message;
}
});
}
11.1.3 TTL过期时间的表现
TTL过期时间到了之后消息没有被处理的话,消息将会自动的被从队列移除掉。