关于SpringBoot中RabbitMQ发送消息的回调以及消息确认、重试机制
若想实现RabbitMQ的消息回调,需要在配置文件下的rabbit选项下添加一行设置:
#启用消息确认机制。能收到MQ Broker的异步响应 publisher-confirms: true
然后在发送者发送消息之前,需要先自行绑定好一个回调方法。
下面是创建回调方法与绑定的全部代码:
package com.skypyb.rabbitmq.producer; import com.skypyb.rabbitmq.entity.User1; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class User1Sender { @Autowired private RabbitTemplate rabbitTemplate;//操作rabbitmq的模板 /** * 回调函数,confirm 确认 */ final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 消息唯一ID * @param ack 确认消息是否被MQ Broker接收,true是已被接收,false反之 * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("CorrelationData:" + correlationData); if (ack) { System.out.println("消息已被Broker接收~~~"); } else { System.err.println("消息未被Broker接收!!!"); } } }; public void send(User1 user1) { //这里是设置回调 rabbitTemplate.setConfirmCallback(confirmCallback); //下面是发消息 CorrelationData correlationData = new CorrelationData(); correlationData.setId(user1.getMessageId()); rabbitTemplate.convertSendAndReceive( "user1-exchange",//exchange "user1.key1",//routingKey user1,//消息体内容 correlationData//消息唯一ID ); } }
这样就已经成功实现回调了。但是发送者发送消息使用的RabbitTemplate是Spring自动创建的,因为Spring默认的Bean是单例的,所以针对不同的确认方案需要配置不同的bean.
比如我上边的代码,用的是默认的,那么我在这设置一次后,其他使用默认RabbitTemplate的发送者发送消息都会触发这个回调。这个看业务场景具体设置吧。
关于确认,看到回调方法里边的那个ack参数了吗?这个参数是MQ发给你的确认,代表消息已经投递成功 , Broker 收到了,也代表消息已经被持久化。
接着MQ Consumer就可以消费这条消息了。
如果消费者发送了ack,RabbitMq将会把这条消息从待确认中删除。如果是nack并且指明不要重新入队列,那么该消息也会删除。但是如果是nack且指明了重新入队列那么这条消息将会进入队列,然后重新发送给消费者。被重新投递的消息消息头amqp_redelivered属性会被设置成true,客户端可以依靠这点来判断消息是否被确认,可以好好利用这一点,如果不做判定每次失败都无脑重新回队列有可能导致同一消息不停的被发送和拒绝。消费者在确认消息之前和RabbitMq失去了连接那么消息也会被重新投递。
关于消费端的手动确认,我也写了一份代码
package com.skypyb.rabbitmq.controller; import com.rabbitmq.client.Channel; import com.skypyb.rabbitmq.entity.User1; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Component public class User1Receiver { /** * @param user1 消息体,使用 @Payload 注解 * @param headers 消息头,使用 @Headers 注解 * @param channel */ /*@RabbitListener表示监听的具体队列. bindings属性代表绑定。里边有几个值填写,填写好绑定的队列名字和交换机名字 指定好routingKey。若指定的这些参数不存在的话。则会自行给你创建好 durable代表是否持久化 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "user1-queue", durable = "true"), exchange = @Exchange(name = "user1-exchange", durable = "true", type = "topic"), key = "user1.#" ) ) @RabbitHandler//标识这个方法用于消费消息 public void onUser1Message(@Payload User1 user1, @Headers Map<String, Object> headers, Channel channel) throws IOException { //消费者操作 System.out.println("-------收到消息辣!-----"); System.out.println("发过来的用户名为:" + user1.getName()); //delivery tag可以从消息头里边get出来 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); try{ //basicAck()表示确认已经消费消息。通知一下mq,需要先得到 delivery tag channel.basicAck(deliveryTag, false); }catch (Exception e){ boolean redelivered = (boolean) headers.get(AmqpHeaders.REDELIVERED); //requeue参数:如果被拒绝的消息应该被重新排队而不是丢弃/死信,则为true,这个参数代表是否重新分发 channel.basicNack(deliveryTag,false,!redelivered); } } }
如果不想手动确认,在配置文件里把 acknowledge-mode: manual 改为auto,SpringBoot就会帮你自动确认。这个的机制是如果方法成功执行完毕无事发生,那就ack,要是途中报错了,就nack。
我个人觉得还是手动确认比较好,比较可控。
如果要使用SpringBoot帮你处理消息重试机制的话可以直接在yml中配
spring: rabbitmq: listener: retry: # 重试次数 max-attempts: 3 # 开启重试机制 enabled: true
关于发送消息的回调,和接收消息后的重试,这两个东西不要随便乱配。
如果不是必须保证消息的投靠特别稳定、数据不能出现一点丢失。那么完全可以不用配这几个东西。嘛,具体还是看业务啦。
你这个是错误的啊,实验了下,不是消息已被消费,是消息已经被broker收到
对,你说的没错;这是我一年前写的文章,还不够严谨,改了改了。