三 rabbitmq发布确认+集群+其他高级( 二 )


8.1.7 消息消费者
package com.bcl.mq.consumer;import com.bcl.mq.config.ConfirmConfig;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;/*** 发布确认 消费者(高级)* @author bcl* @date 2021/9/6*/@Component@Slf4jpublic class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)public void receiveMsg(Message message) {String msg = new String(message.getBody());log.info("接受到队列 confirm.queue 消息:{}", msg);}}
8.1.8 结果分析
【三rabbitmq发布确认+集群+其他高级】交换机已经收到 id 为:1的消息发送消息内容:你好接受到队列 confirm.queue 消息:你好key1交换机已经收到 id 为:2的消息
可以看到,发送了两条消息,第一条消息的为 “key1”,第二条消息的为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的与队列的不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了 。
8.2. 回退消息 8.2.1.参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊 。通过设置参数可以在当消息传递过程中不可达目的地时将消息返回给生产者 。
spring.rabbitmq.publisher-returns=true
总体配置
spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123spring.rabbitmq.publisher-confirm-type=correlatedspring.rabbitmq.publisher-returns=true
8.2.2. 消息回调接口修改
package com.bcl.mq.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 发布确认 回调接口(高级)* 生产者发送消息 发不出去 回调接口** @author bcl* @date 2021/9/6*/@Component@Slf4jpublic class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//依赖注入 rabbitTemplate 之后再设置它的回调对象@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);}/*** 交换机确认回调方法* 1.发消息 交换机接收到了 回调* 1.1 correlationData保存回调信息ID及相关信息* 1.2 交换机收到消息 ack = true* 1.3 case null* 2.发消息 交换机接收失败了 回调* 2.1 correlationData保存回调信息ID及相关信息* 2.2 交换机收到消息 ack = false* 2.3 case 失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {log.info("交换机已经收到 id 为:{}的消息", id);} else {log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);}}//当消息无法路由的时候的回调方法@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(" 消息{}, 被交换机{}退回 ,退回原因:{}, 路由key:{}",new String(returnedMessage.getMessage().getBody()), returnedMessage.getExchange(), returnedMessage.getReplyText(), returnedMessage.getRoutingKey());}}