解决死信队列消息过期非异步问题,RabbitMQ 延时消息更优解——插件大法(Docker版)
上一篇文章: RabbitMQ 死信机制真的可以作为延时任务这个场景的解决方案吗? 里最终得出的结论:
RabbitMQ 死信机制可以作为延时任务这个场景的解决方案
但是,由于 RabbitMQ 消息死亡并非异步化,而是阻塞的。所以无法作为复杂延时场景——需要每条消息的死亡相互独立这种场景 下的解决方案。
如果说,果真我的业务就是有这个需求呢?
既需要延时触发、也可以满足延时时间不定长
一、解决方案
本身 RabbitMQ 没有这种功能,不过仍然可以使用 RabbitMQ 解决这个场景。
那就是使用插件大法。这也应该是使用 RabbitMQ 时,除了管控台插件外用的最多的一个插件。
需要用到的插件就是这个: rabbitmq_delayed_message_exchange 插件
见名思意,延时消息交换机; 对,他的实现方式已经和队列已经无关了。
这个插件启用后的作用是在原来的 direct、topic、fanout 等这些 exchange 基础上,又新加了一个 exchange 。这个 exchange 的类型是 x-delayed-message
只要发送消息时指定的是这个交换机,那么只需要在消息 header 中指定参数 x-delay [:毫秒值] 就能够实现每条消息的异步延时
二、如何安装插件
之前安装 RabbitMQ 的时候,那是真的搞了我一段时间。可以看这篇文章 –> RabbitMQ基本简介与我亲身经历的安装流程(CentOS7)
现在都 0202 年了,还这么搞就太挫了。其实早在N久之前我电脑上的 RabbitMQ 就已经改成了 docker 运行。
那么就在这次插件安装过程中来顺便说一下。
首先 docker 安装 RabbitMQ 很简单,这是安装并运行 RabbitMQ3.7.7 的命令
还自带了管控台插件,只需配置好端口映射/文件夹映射,并设置下默认账号密码就完事了
docker run --name rabbitmq -dt\ -v /opt/dockerdata/rabbitmq:/var/lib/rabbitmq\ -p 5672:5672 -p 15672:15672\ --hostname rabbit\ -e RABBITMQ_DEFAULT_VHOST=/\ -e RABBITMQ_DEFAULT_USER=admin\ -e RABBITMQ_DEFAULT_PASS=614\ rabbitmq:3.7.7-management
好,只要有了RabbitMQ 接下来就只需要安装插件并启用了。
这是插件的github地址 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
先将兼容版本的插件下载到本地, 然后复制进docker容器内执行 rabbitmq-plugins 命令启用就OK了
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins rm rabbitmq_delayed_message_exchange-3.8.0.ez docker exec -it rabbitmq bin/bash rabbitmq-plugins enable rabbitmq_delayed_message_exchange
三、在项目中使用
那么现在万事俱备了,就只需要在项目中实际的使用起来,测试一下是否真能达到想要的效果。
下面的代码都会排除干扰项,删掉之前写的所有和本次无关的配置,以便于阅读
3.1、 首先,当然还是先创建绑定关系。
/** * Rabbitmq的绑定配置,绑定Exchange、MQ、RoutingKey * Create by skypyb on 2019-11-16 */ @Configuration public class RabbitBindConfig { public final static String SKYPYB_DELAY_EXCHANGE = "skypyb-delay-exchange"; public final static String SKYPYB_DELAY_QUEUE = "skypyb-delay-queue"; public final static String SKYPYB_DELAY_KEY = "skypyb.key.delay"; @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); //自定义交换机 return new CustomExchange(SKYPYB_DELAY_EXCHANGE, "x-delayed-message", false, true, args); } @Bean public Queue delayQueue() { return new Queue(SKYPYB_DELAY_QUEUE, false, false, true); } @Bean public Binding bindingDelayExchangeAndQueue() { return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(SKYPYB_DELAY_KEY).noargs(); } }
值得注意的是,在使用插件给我们带来的新的延迟交换机时, 由于 SpringAMQP 中并没有内置这种模型,所以需要创建 CustomExchange,也就是自定义交换机。
并且需要设置 CustomExchange 的类型为 x-delayed-message
至于队列和绑定关系的设置,该怎么配就怎么配。
在创建绑定关系时,最终需要调用一下 noargs() 方法,BindingBuilder 在绑定 CustomExchange 时 with() 方法返回值并不会是 Binding 类。
3.2、消费者编写
在绑定关系创建完毕之后,对应的消费者也是需要的。
其实这个消费者没有任何特殊的地方,毕竟使用了此插件也只是交换机和发消息时要做出改变,队列本身是没有变化的。
@RabbitListener(queues = {RabbitBindConfig.SKYPYB_DELAY_QUEUE}) @Component public class DelayReceiver { private Logger logger = LoggerFactory.getLogger(DelayReceiver.class); @RabbitHandler public void onDelayMessage(@Payload String message, @Headers Map<String, Object> headers, Channel channel) throws IOException { logger.info("监听延时交换机, 收到消息: {}", message); //delivery tag可以从headers中get出来 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); try { channel.basicAck(deliveryTag, false); } catch (Exception e) { boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED); channel.basicNack(deliveryTag, false, !redelivered); } } }
3.3、测试延时消息发送
那么现在是 “真” 万事俱备。
写个测试类,来往指定的交换机里发送消息。这里当然是向我们刚创建的延时交换机发消息啦。
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class RabbitmqTest { @Autowired private RabbitTemplate rabbitTemplate; private Logger logger = LoggerFactory.getLogger(RabbitmqTest.class); @Test public void testDelay() { rabbitTemplate.convertAndSend(RabbitBindConfig.SKYPYB_DELAY_EXCHANGE, RabbitBindConfig.SKYPYB_DELAY_KEY, "消息体-5s", (msg) -> { msg.getMessageProperties().setDelay(5000); return msg; }); rabbitTemplate.convertAndSend( RabbitBindConfig.SKYPYB_DELAY_EXCHANGE, RabbitBindConfig.SKYPYB_DELAY_KEY, "消息体-3s", (msg) -> { msg.getMessageProperties().setDelay(3000); return msg; }); logger.info("-----消息发送完毕-----"); } }
在发送消息的地方,也是需要做出处理的。
可以通过以下方法来设置消息的 Header。来达到指定延时时间的目的。
message.getMessageProperties().setHeader("x-delay",3000);
但是有一点很奇妙的是 SpringAMQP 他居然自己集成了对应的API (那为啥不集成延时交换机的API? )
所以可以通过这个方式来设置延时时间:
message.getMessageProperties().setDelay(3000);
最后,代码均编写完毕。
启动消费者服务用以监听队列,然后启动测试类观察消息投放,
最终控制台打印:
2020-01-18 12:26:28.808 INFO 24592 — [ main] com.skypyb.test.RabbitmqTest : —–消息发送完毕—–
2020-01-18 12:26:31.827 INFO 22844 — [cTaskExecutor-1] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-3s
2020-01-18 12:26:33.813 INFO 22844 — [cTaskExecutor-3] c.s.rabbitmq.controller.DelayReceiver : 监听延时交换机, 收到消息: 消息体-5s
可以看到其完美的符合了需求。
结语
延时任务这个场景具体的解决方案也就差不多到这了。
死信机制除了比较复杂的延时场景以外,其实也可以满足大多数需求。
那么若是遇到了死信也解决不了的延时场景,RabbitMQ 本身的机制无法实现的话,那么我们可以靠插件来实现对应的需求。
确确实实,RabbitMQ 的这个延时交换机插件还是有点东西的,也难怪 Spring 给他集成了对应的 API。
看了这两篇文章的人,以后若是遇到对应的场景,该用什么就不用我多说了吧 (
学习了学习了