RabbitMQ消息可靠性方案分析

/ 默认分类 / 0 条评论 / 1010浏览

MQ消息可靠性方案

如下所示,在消息整个过程中,下面的三个阶段都可能会发生消息丢失
消息生命过程
下面我们一个个来分析过程和解决方式

一. 生产者推送

1.丢失描述

在整个消息的生命过程中,生产者推送消息到broker就是最先会发生消息丢失的阶段,可能有以下几种可能:

  1. 生产者发送消息的时刻,broker挂掉
  2. 生产者发送消息,但是因为网络问题导致消息没有发送出去
  3. 生产者发送消息到broker,但没有找到任何交换机
  4. 生产者发送消息到broker,到达交换机后,没有路由到任何队列
  5. 生产者消息推送到队列后,但是还没有被消费,此时broker挂掉了,重启后发现消息丢失了

第一种和第二种情况如果发生,客户端会抛出网络IO异常,所以我们只需要捕获异常然后进行相应的处理即可.
第三和第四种可以使用confirm回调和return回调
第五种情况可以在发送时设置消息投递方式为持久化投递,这样即使broker重启,消息也不会丢失

2.解决方法

1,2,5这三种情况的解决方法方法在我之前的文章中已经介绍了,这里主要说下上面的3,4两种情况.
方法1:使用定时任务遍历消息库,然后重发

spring:
  application:
    name: micro_amqp
  rabbitmq:
    host: 192.168.12.147
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
server:
  port: 10002
@Configuration
public class RabbitMqConfig {

    @Autowired
    ConnectionFactory defaultFactory;

    @Autowired
    MyRabbitMqComfirmCallBack comfirmCallBack;

    @Autowired
    MyRabbitMqReturnCallBack returnCallBack;

    @Bean("rabbitTemplate")
    @Primary
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(defaultFactory);
        //设置强制消息,即开启retrun回调
        rabbitTemplate.setMandatory(true);
        //设置自定义的confirm回调和return回调
        rabbitTemplate.setConfirmCallback(comfirmCallBack);
        rabbitTemplate.setReturnCallback(returnCallBack);
        return rabbitTemplate;
    }

    @Bean("rabbitTemplateTest1")
    public RabbitTemplate rabbitTemplateTest1() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();

        CachingConnectionFactory con = new CachingConnectionFactory();
        con.setHost(MQ.TEST1_147.getHost());
        con.setPort(MQ.TEST1_147.getPort());
        con.setUsername(MQ.TEST1_147.getUsername());
        con.setPassword(MQ.TEST1_147.getPassword());
        con.setVirtualHost(MQ.TEST1_147.getVirtualhost());
        con.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        con.setPublisherReturns(true);

        rabbitTemplate.setConnectionFactory(con);
        rabbitTemplate.setConnectionFactory(con);
        //设置强制消息,即开启retrun回调
        rabbitTemplate.setMandatory(true);
        //设置自定义的confirm回调和return回调
        rabbitTemplate.setConfirmCallback(comfirmCallBack);
        rabbitTemplate.setReturnCallback(returnCallBack);
        return rabbitTemplate;
    }

}

模拟发送消息的接口

    @Autowired
    private MQUtil mq;

    //发送到默认mq
    @GetMapping("/2/{msg}")
    public String doRabbitmqTemplate2(@PathVariable("msg") String msg){
        mq.sendMsg("testExchange","testqueue1.haha",new Student(msg));
        return "ok";
    }
    
    //发送到指定mq
    @GetMapping("/3/{msg}")
    public String doRabbitmqTemplate3(@PathVariable("msg") String msg){
        mq.sendMsg("testExchange11","testqueue.haha",msg,MQ.TEST1_147);
        return "ok";
    }

MQ枚举

public enum MQ {

    TEST1_147("192.168.12.147&5672&guest&guest&test1")

    ;
    String host;
    int port;
    String username;
    String password;
    String virtualhost;

    MQ(String infoStr) {
        String[] infoArr = infoStr.split("&");
        this.host = infoArr[0];
        this.port = Integer.valueOf(infoArr[1]);
        this.username = infoArr[2];
        this.password = infoArr[3];
        this.virtualhost = infoArr[4];
    }
    
//构造方法和getter,setter,toString
}

下面是消息发送逻辑和回调检查

@Slf4j
@Component
public class MQUtil {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource(name = "rabbitTemplateTest1")
    private RabbitTemplate rabbitTemplateTest1;

    public static ArrayList<MsgRecord> msgList = new ArrayList<>();

    private Map<MQ,RabbitTemplate> rabbit = new HashMap<>();

    private  String defaultHost;

    private  int defaultPort;

    private  String defaultVhost;

    @PostConstruct
    public void init()
    {
        defaultVhost = rabbitTemplate.getConnectionFactory().getVirtualHost();
        defaultPort = rabbitTemplate.getConnectionFactory().getPort();
        defaultHost= rabbitTemplate.getConnectionFactory().getHost();
        rabbit.put(MQ.TEST1_147,rabbitTemplateTest1);
    }

    public void requeue(MsgRecord requeueMsg)
    {
        MsgRecord.Msg msg = requeueMsg.getMsg();
        int requeueTimes = requeueMsg.getTimes();
        System.out.println(requeueMsg.getId()+"是推送失败的消息,开始重新推送,当前重推次数:"+(++requeueTimes));
        requeueMsg.setTimes(requeueTimes);
        if(requeueMsg.getTimes() == 3){
            requeueMsg.getMsg().setExchange("testExchange");
        }
        RabbitTemplate rabbitTemplate = getRabbitTemplate(msg.getMq());
        doSend0(rabbitTemplate,msg.getExchange(),msg.getRouteKey(),msg.getMsg(),requeueMsg.getId());
    }

    public void sendMsg(String exchange, String routeKey, Object msg)
    {
        send(exchange,routeKey,msg,null);
    }

    public void sendMsg(String exchange, String routeKey, Object msg, MQ mq)
    {
        send(exchange,routeKey,msg,mq);
    }

    private void send(String exchange, String routeKey, Object msg,MQ mq)
    {
        RabbitTemplate exactRabbitTemplate = getRabbitTemplate(mq);
        doSend(exactRabbitTemplate,exchange,routeKey,msg,mq);
    }

    private void doSend(RabbitTemplate rabbit,String exchange, String routeKey, Object msg,MQ mq)
    {
        String msgStr = JSON.toJSONString(msg);
        //生成消息唯一id
        String id = generateMsgId();
        addMsgRecord(id,msgStr,routeKey,exchange,mq);
        doSend0(rabbit,exchange,routeKey,msgStr,id);
    }

    private void doSend0(RabbitTemplate rabbit,String exchange, String routeKey, String msg,String id)
    {
        try{
            //创建消息回调信息
            CorrelationData correlationData = new CorrelationData(id);
            log.info("【开始写入消息队列】MQ信息:[{}],交换机:[{}],路由键:[{}],消息:[{}],唯一id:[{}]",defaultHost+":"+defaultPort+"/"+defaultVhost,exchange,routeKey, msg,correlationData);
            rabbit.convertAndSend(exchange,routeKey,msg,correlationData);
        }catch(Exception e)
        {
            log.error("【消息发送失败,MQ连接异常】MQ信息:[{}],交换机:[{}],路由键:[{}],消息:[{}],唯一id:[{}]",defaultHost+":"+defaultPort+"/"+defaultVhost,exchange,routeKey, msg,id);
            sendAlert(CommonUtil.combineLog("【消息发送失败,MQ连接异常】MQ信息:[{}],交换机:[{}],路由键:[{}],消息:[{}],唯一id:[{}]",defaultHost+":"+defaultPort+"/"+defaultVhost,exchange,routeKey, msg,id));
        }
    }

    private  String generateMsgId() {
        return UUID.randomUUID().toString();
    }

    private RabbitTemplate getRabbitTemplate(MQ mq) {
        if(Objects.isNull(mq)){
            return rabbitTemplate;
        }
        RabbitTemplate findRabbitTemplate;
        if(rabbit != null && rabbit.size() != 0 && (findRabbitTemplate = rabbit.get(mq)) != null){
            return findRabbitTemplate;
        }
        //没有找到指定的mq则使用默认的mq
        return rabbitTemplate;
    }

    private void addMsgRecord(String id, String msgStr,String routeKey,String exchange,MQ mq) {
        MsgRecord msgRecord = new MsgRecord();
        msgRecord.setId(id);
        MsgRecord.Msg msg = new MsgRecord.Msg(msgStr,routeKey,exchange,mq);
        msgRecord.setMsg(msg);
        msgList.add(msgRecord);
    }

    //模拟发送告警通知
    private  void sendAlert(String s) {
        System.out.println("发送告警通知:"+s);
    }

}

简单的消息记录模型,可按照业务场景自定义设计

/**
 * 如果默认状态为0,那么nack时需要取出状态,修改为1,然后直到有一次重推成功,即ack时需要取出状态修改为0,所以ack和nack都需要取一次数据和修改数据
 * 如果默认状态为1,那么nack就不需要管,只要ack就改为1
 * 但是ack肯定比nack多的多的多,所以其实上面两种方式基本差不多
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgRecord {

    String id;

    //写入成功的消息,默认的状态就是1,表示写入失败,为什么默认设置为1呢
    int status = 1;

    int times;

    Msg msg;

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static public class Msg{
        String msg;

        String routeKey;

        String exchange;

        MQ mq;
    }

}

confirm回调

@Slf4j
@Component
public class MyRabbitMqComfirmCallBack implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId();
        if(StringUtils.isEmpty(id)){
            return;
        }
        if (ack) {
            //因为默认的状态就是1,表示写入失败,如果是ack,则修改状态为成功
            log.info("【confirm回调-消息写入成功】id:{}",id);
            MQUtil.msgList.forEach(msg -> {
                if(id.equals(msg.getId())) {
                    //如果消息推送成功,设置消息的状态为0
                    msg.setStatus(0);
                }
            });
        } else {
            log.info("【confirm回调-消息写入失败】id:{},原因:{}",id,cause);
        }
    }
}

return回调

@Slf4j
@Component
public class MyRabbitMqReturnCallBack implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        sendAlert("消息:"+new String(message.getBody())+"无法路由到任何队列,交换机:"+exchange+",路由键:"+routingKey+",回调响应:("+replyCode+":"+replyText+")");
    }


    private void sendAlert(String s) {
        System.out.println("发送告警通知:"+s);
    }
}

模拟定时任务检测

/**
 * 这里作为示例,简单起见,使用集合模拟的数据库持久化,每次遍历都是全部数据,实际情况下最好是设置标志或者使用中间表(异步)删除已经ack的消息,避免重复检查ack的数据
 * 定时任务检测消息是否发送成功
 */
@RestController
public class RabbitmqCheckJob implements Runnable {

    //消息重新推送的最大次数阈值,重推次数超过阈值,就不重新推送了,直接通知相关人员
    final int MAX_REQUEUE_TIMES = 4;

    RabbitTemplate rabbitTemplate;

    MQUtil mq;

    @Override
    public void run() {
        //在线程执行逻辑使用自动注入会发生null,需要手动注入
        rabbitTemplate = BeanContext.getApplicationContext().getBean(RabbitTemplate.class);
        mq = BeanContext.getApplicationContext().getBean(MQUtil.class);
        //循环检测
        for (;;){
            System.out.println("定时任务开始检查");
            //因为使用的是List集合来表示数据库中的消息,如果消息刚推送还没有收到确认响应,那么此时可能直接被判定为消息写入成功,所以这里先获取当前时刻的消息集合的快照,然后睡眠1s,保证broker的响应到达客户端
            //这里使用的是rabbbitmq客户端自带的工具类,这个工具类在我的一片rabbitmq客户端源码的笔记中有介绍
            ArrayList<MsgRecord> copy = (ArrayList<MsgRecord>) Utility.copy(MQUtil.msgList);
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            copy.forEach(msg -> {
                //如果发送失败次数已经达到阈值,通知相关人员.这里做的是例子,简单起见,超出次数的消息每次都会被推送警告,可以设置标志位
                if(msg.getTimes() > MAX_REQUEUE_TIMES){
                    sendAlert("召唤背锅侠!出问题啦!推送失败的消息:"+msg.toString());
                    //不再重新推送
                    return;
                }
                if(msg.getStatus() == 1){
                    mq.requeue(msg);
                }
            });
            try {
                System.out.println("定时任务检查完毕,一个小时候后再检查(简单起见这里为10s)");
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    //模拟发送告警通知
    private void sendAlert(String s) {
        System.out.println("发送告警通知:"+s);
    }

}

以上就是主要的实现逻辑,具体含义我在代码中有相关注释,下面是案例中需要使用的其他方法,包括手动获取容器中的bean和工具方法

@Slf4j
@Configuration
public class ApplicationStartedEventListener implements ApplicationListener<ApplicationStartedEvent>, Ordered {

    @Override
    public void onApplicationEvent(ApplicationStartedEvent ev) {
        //模拟容器启动后开启消息检测的定时任务
        Thread checkMsgJob = new Thread(new RabbitmqCheckJob());
        checkMsgJob.start();
    }


    @Override
    public int getOrder() {
        return 666;
    }
}


@Component
public class BeanContext implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        BeanContext.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public static <T> T getBean(String name) throws BeansException {
        return (T) applicationContext.getBean(name);
    }

    public static <T> T getBean(Class<T> clz) throws BeansException {
        return (T) applicationContext.getBean(clz);
    }
}

public class CommonUtil {

    public static String combineLog(String content,String ... info){
        String origin = content;
        try{
            for (int i = 0; i < info.length; i++) {
                content = content.replaceFirst("\\{\\}",info[i]);
            }
            return content;
        }catch(Exception ignored)
        { }
        return origin;
    }

}

以上就是消息发送入库,定时任务检查的方案的简单实现,可以按照方案结合实际业务场景进行设计,上面我使用的是springamqp的框架,其实也可以只使用 rabbitmq的客户端来实现,会比较清晰一些(前面我的文章中有介绍过相关方法,其实rabbitmq的光官方文档中也有详细说明)

方法2:延时推送检查
对于上面的方法,有一个致命的缺陷,需要在发送消息前进行消息的持久化,如果在真实的业务场景中,可能需要先持久化存储业务数据,然后按照业务数据 生成的mq消息还需要再次持久化,之后发送消息等待回调,然后定时任务检测,你会发现,这个过程中进行了两次数据持久化,在对请求响应要求较高,并发数 较大的场景下,数据持久化需要尽量减少,所以还可以按照下面的方法来进行校验---延时推送检查

这种方式就按照上图来设计即可;

这两种方案是我在网络上查看总结的方式,但我更倾向于,在生产端发送消息时,异步持久化该条消息,然后和第一种方式一样,定时任务寻找失败的消息;
方案有很多种,欢迎各位大佬下方留言指教!

二. Broker消息存储

上面讨论了消息从生产端发送到broker可能丢失的情况,但是如果消息发送到broker,但是由于mq宕机了,那么也可能会导致消息丢失,所以最好在发送消息时指定消息使用持久化投递模式delivery_mode=2,这样 即使队列中存在未消费的消息,此时mq宕机或者重启,数据也不会丢失

三. 消费端消息

对于这样的执行逻辑

		try{
            System.out.println("消费逻辑");
            channel.basicAck(ms.getMessageProperties().getDeliveryTag(),false);
        }catch(消费异常 e)
        {
            //异常处理逻辑1
        }catch (网络io异常 e1){
            //异常处理逻辑1
        }

有下面几种情况

  1. 网络通顺
    查看原图

  2. 网络异常

    查看原图

所以,无论哪种情况rabbitmq服务端都可能会发生nack消息堆积,但是避免因为网络原因导致最后的ack发送失败,如果在发送时就捕获到连接断开的异常,那么捕获后可以重试,或者记录日志 告警啥的,另外就是springamqp中有这样一个配置:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto  # 自动ack
        retry:
          enabled: true
          max-attempts: 10
          initial-interval: 2000  # 重试起始间隔时间
          max-interval: 20000   # 重试最大间隔时间ms
          multiplier: 2 # 间隔时间扩大倍数,间隔时间*倍数=下一次的间隔时间,最大不能超过设置的最大间隔时间

这里的重试,是指,如果在onMessage逻辑中抛出异常,那么就会触发springamqp封装的消费者的重试机制,这个重试不是指发送nack(requeue=true)然后不断地broker重新 推送消息到消费者,而是springamqp自己实现的重试逻辑,和broker没有任何交互,只是单纯的重新执行消费逻辑代码,这一点需要注意

所以,对于堆积的nack消息,可能一部分是因为消费逻辑执行是正确的,即这条消息事实上是成功消费的,但是因为网络原因导致发送的ack在broker端没有收到,导致这条消息一直 是nack状态,那么在消费者断开时,消息状态会从nack->ready,这个时候如果消费者再次连接上broker,这条消息就会再次被推进来,所以这个过程中,需要保证消费端的幂等性, 保证多次消费的效果都和消费一次是一样的;

四.草稿记录(未完待续....)

ps:关于消费幂等性,有时间再总结下,常用的就是为没条消息设置唯一id,消费时进行确认

解决消息堆积是一个比较困扰的问题,网络上有很多解决方法,但是很多需要按照实际情况来处理,有时间再总结下这个问题,下面我简单记录下,几个简单的点:

1.发生消息对接的原因可能是由于消费和生产端速度不一致,导致消费跟不上
2. 可能是nack消息变多,导致nack堆积

可以按照实际需求场景,可以选择为消息设置ttl,超过一定时间后到达绑定的死信,然后告警推送
可以使用能弹性扩容的消费者,比如springamqp中的消费者扩容机制,可以按照当前消息的接收频率自动增加扩展消费者数量
等等