RabbitMQ延时推送队列的两种实现方式

/ 默认分类 / 0 条评论 / 829浏览

RabbitMQ延时推送队列的两种实现方式

一.前言

所谓延时队列,就是消息发布后,消费者需要经过指定的延时时间后才会接收到消息来消费,在rabbitmq中,默认是不支持延时队列的,但是可以使用 死信队列和消息ttl来实现延时队列的效果,当然在rabbitmq3.8.x后支持使用延时队列插件来实现
延时队列

二.实现方式

1. 死信队列+消息ttl

具体实现方式如下:

死信队列+消息ttl实现方式

上面这种方式只要将实际的延时消费绑定在死信队列上即可,具体的操作不多记录了,很简单,主要来说下这种方式实现需要注意的一些点:

  1. 如果生产方消息产生速度很快,那么需要注意中间队列消息数量达到限制的情况,如果达到数量限制那么消息会直接变为死信,这样就和业务相背了
    解决办法: 可以在发送消息的时候先检查队列当前堆积的消息的数量,如果数量超出阈值,那么可以在发送消息时创建新的队列,然后将消息发到新的队列中,队列设置自动过期删除(大于延时时间即可)
    可以使用queueDeclarePassive来获取当前队列的消息堆积数量
        //如果queue在broker不存在,则会报错:reply-code=404,reply-text=NOT_FOUND,如果存在则会返回队列当前的信息
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().getDelegate().createChannel();
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("orderQueue");
        int messageCount = declareOk.getMessageCount();
  1. 上面的方法在判断当前消息堆积数量的时候会有线程安全问题
    可以尽量将阈值设置和队列最大数量相差较大,或者加锁(这样会影响速度)
2. rabbitmq延迟消息插件

rabbitmq插件使用官方文档
rabbitmq延时队列插件地址,这里有详细的使用方法

上面的官方文档已经说的很明白了,这里简单记录下主要实现原理,首先插件支持后,可以申明延迟交换机

args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

之后发送消息就可以直接发送到上面那个支持延时发送的交换机中,并且消息可以设置延迟时间

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes)

消息发送到broker后会按照延迟时间来判断,然后将消息存入Mnesia表中,如果时间到了会被推送到相关交换机然后路由到相关队列,这样就被延时消费了

ps:jdk中自带的DelayQueue就是一种无界限阻塞延时队列,只有过期的元素才会在获取的时候出队