博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring Boot系列——死信队列
阅读量:7261 次
发布时间:2019-06-29

本文共 4488 字,大约阅读时间需要 14 分钟。

在说死信队列之前,我们先介绍下为什么需要用死信队列。

如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可。

ack机制和requeue-rejected属性

我们还是基于上篇《Spring Boot系列——7步集成RabbitMQ》的demo代码来说。

在项目我们看到application.yaml文件部分配置内容如下

...listener:    type: simple    simple:      acknowledge-mode: auto      concurrency: 5      default-requeue-rejected: true      max-concurrency: 100...

其中

acknowledge-mode

该配置项是用来表示消息确认方式,其有三种配置方式,分别是none、manual和auto。

none意味着没有任何的应答会被发送。

manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。

auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。

default-requeue-rejected

该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true。

我一开始对于这个属性有个误解,我以为rejected是表示拒绝,所以将requeue-rejected连起来是拒绝重新放回队列,后来查了资料明白这个属性的功能才想起来rejected是个形容词,其表示的应该是被拒绝的消息

所以如果该属性配置为true表示会重新放回队列,如果配置为false表示不会放回队列。

下面我们看看acknowledge-mode参数和default-requeue-rejected参数使用不同的组合方式,RabbitMQ是如何处理消息的。

代码依然使用springboot-demo中的RabbitApplicationTests发送消息,使用Receiver类监听demo-queue队列的消息。

对于Receiver类添加了一行代码,该代码模拟抛出异常

@Componentpublic class Receiver {    @RabbitListener(queues = "demo_queue")    public void created(String message) {        System.out.println("orignal message: " + message);        int i = 1/0;    }}

acknowledge-mode=none, default-requeue-rejected=false

619240-20181028194738289-1243021645.gif

该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。通过在RabbitMQ管理页面也没有看到重新放回队列的消息

acknowledge-mode=none, default-requeue-rejected=true

619240-20181028194753132-1812919482.gif

同样该配置不会确认消息是否正常消费,所以在控制台没有抛出任何异常。而且即使default-requeue-rejected配置为true因为没有确认所以也没有看到重新放回队列的消息

acknowledge-mode=manual, default-requeue-rejected=false

619240-20181028194809128-1886514477.gif

该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,个人理解是因为没有收到ack,所以消息又回到了队列中。

acknowledge-mode=manual, default-requeue-rejected=true

619240-20181028194821965-1394071123.gif

该配置需要手动确认消息是否正常消费,但是代码中并没有手动确认,所以消息被重新放入到队列中了,并且在控制台发现还抛出了异常(这块不是很清楚,default-requeue-rejected设置true和false带来的不同效果,有了解的麻烦下方留言指教)。

acknowledge-mode=auto, default-requeue-rejected=false

619240-20181028194837888-194794090.gif

该配置采用自动确认,从结果来看,是自动确认了。

从控制台打印的结果可以看出Receiver方法执行了3次,分别是前面两条放回队列的消息以及这次发送的消息,所以3条消息都消费了。

同时因为default-requeue-rejected设置为false,所以即使消费抛出异常,也没有将消息放回队列。

acknowledge-mode=auto, default-requeue-rejected=true

619240-20181028194905644-614798860.gif

该配置同样采用自动确认,从结果看出,没有抛出异常(这块也不是很理解),且因为default-requeue-rejected设置为true,所以消息重新回到队列。

综上罗列这么多情况只为说明有些情况下,如果消息消费出错,因为配置问题导致消息丢失了。这在很多情况下是要命的,比如用户支付的订单号,如果因为抛异常等原因直接丢失是很要命的。

所以,我们需要有一个确保机制,能够保证即使失败的消息也能保存下来,这时候死信队列就排上用场了。

死信队列

死信队列的整个设计思路是这样的

生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

下面我们通过网上的一个简单的死信队列的实现看看如何使用死信队列。

@Bean("deadLetterExchange")    public Exchange deadLetterExchange() {        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();    }    @Bean("deadLetterQueue")    public Queue deadLetterQueue() {        Map
args = new HashMap<>(2);// x-dead-letter-exchange 声明 死信交换机 args.put("x-dead-letter-exchange", "DL_EXCHANGE");// x-dead-letter-routing-key 声明 死信路由键 args.put("x-dead-letter-routing-key", "KEY_R"); return QueueBuilder.durable("DL_QUEUE").withArguments(args).build(); } @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable("REDIRECT_QUEUE").build(); } /** * 死信路由通过 DL_KEY 绑定键绑定到死信队列上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null); } /** * 死信路由通过 KEY_R 绑定键绑定到死信队列上. * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null); }

注意

  • 声明了一个direct模式的exchange。

  • 声明了一个死信队列deadLetterQueue,该队列配置了一些属性x-dead-letter-exchange表明死信交换机,x-dead-letter-routing-key表明死信路由键,因为是direct模式,所以需要设置这个路由键。

  • 声明了一个替补队列redirectQueue,变成死信的消息最终就是存放在这个队列的。

  • 声明绑定关系,分别是死信队列以及替补队列和交换机的绑定。

那么如何模拟生成一个死信消息呢,可以在发送到DL_QUEUE的消息在10秒后失效,然后转发到替补队列中,代码实现如下

public void sendMsg(String content) {        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());        MessagePostProcessor messagePostProcessor = message -> {            MessageProperties messageProperties = message.getMessageProperties();//            设置编码            messageProperties.setContentEncoding("utf-8");//            设置过期时间10*1000毫秒            messageProperties.setExpiration("5000");            return message;        };        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", content, messagePostProcessor);    }

执行结果如下

619240-20181028194930008-324564222.gif

消息首先进入DL_QUEUE,5秒后失效,被转发到REDIRECT_QUEUE中。

如果您觉得阅读本文对您有帮助,请点一下“推荐”按钮,您的“推荐”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

1240

转载于:https://www.cnblogs.com/bigdataZJ/p/springboot-deadletter-queue.html

你可能感兴趣的文章
基于Flume的美团日志收集系统(一)架构和设计【转】
查看>>
15个专业版响应式WordPress主题
查看>>
33个与众不同的Web表单设计
查看>>
软件测试介绍,测试流程,测试方法,测试工具、设计测试用例
查看>>
利用Android NDK编译lapack
查看>>
TOMCAT 集群之 PERSISTENT SESSION
查看>>
URL加载系统----iOS工程师必须熟练掌握
查看>>
PHPCMS标签
查看>>
迁移数据库后出现异常情况,存储过程不能用
查看>>
Android开发在路上:少去踩坑,多走捷径
查看>>
asp.net mvc控制器动作体返回ImageResult,可作验证码
查看>>
微软BI 之SSIS 系列 - 在 SQL 和 SSIS 中实现行转列的 PIVOT 透视操作
查看>>
ylbtech-LanguageSamples-COMInteropPart1(COM 互操作 - 第一部分)
查看>>
RabbitMQ学习之:(七)Fanout Exchange (转贴+我的评论)
查看>>
JS函数匿名替换
查看>>
(转)x264代码详细阅读之x264.c,common.c,encoder.c
查看>>
IOS 面试 --- 网络部分
查看>>
IOS小工具以及精彩的博客
查看>>
Unity3D热更新全书-重头再来
查看>>
Android WebView的注意事项
查看>>