热点账户的处理方案

/ 默认分类 / 0 条评论 / 1563浏览

热点账户的处理方案

一.简介

在银行或者第三方金融业务企业中,经常需要涉及到账务数据处理,每当一笔资金从一个账户转入,相应的也会有一笔出金的记录,在每一个记账的过程中都涉及到两个重要的过程,一个就是记录记账流水凭证,第二个就是更新账户余额。为了保证在并发场景下余额更新的准确性,就需要给账户资源上锁,该账户的账务操作结束后再释放锁,这里的这个需要被频繁更新的账户就是所谓的热点账户。在高并发的场景下,热点账户更新操作就会成为整个账务处理过程中的性能瓶颈,因为这样的独占锁使得该热点账户同一时刻只能有一个线程对其进行操作,所有线程需要排队处理。

二.热点账户数据更新的方案

下面将介绍一些常见的方案,其中有一些比较容易理解,我只做简单描述。

1.流量控制

之所以出现热点账户的问题,归根结底是因为并发数太高,因此我们可以直接在请求流量上进行控制,对涉及到热点账户的更新的业务请求进行限流,熔断操作,这样就可以避免性能瓶颈导致的一系列问题,但是这种方法似乎有点掩耳盗铃的意思,本质上还是没有解决热点账户更新的性能瓶颈,只是以牺牲用户体验,业务损失来避免问题发生。

2.子账户拆分

具体来讲就是创建与热点账户对应的多个影子账户,所述影子账户与所述账户的数据结构相同,将所述影子账户设置为隐藏,并将所述账户的余额分散至各个影子账户。当账务系统接收到账务请求的时候,通过前置进行hash分配(具体的hash函数会有更多方案)选择影子账户进行记账,这样就将原来对一个账户的请求分散到多个影子账户中,分散了账务热点。   这个方案也有缺点:通过算法选择的影子账户扣款,影子账户的余额可能是不足的,但账户的总余额是够的,这样可能影响账务处理的成功率。所以这个方案中的子账户路由的hash分配算法就非常重要。另外子账户的数量也需要控制好,随着并发数的增长,子账户的数量也需要做出调整。

3.延迟操作流水+定时入账

实时的交易全部insert账务明细(insert的开销很小,能够支持高并发。如果基于分布式部署,insert的并发容量理论上可以无限大),然后定时(比如每半个小时)将之前半个小时内的账务明细sum出一个结算总金额,一笔入账结算到指定的热点账户。这个方案的缺点就是:交易不能实时入账,其实如果控制好定时汇总入账的频度,比如分钟级,用户也是可以接受的。这种方式对收单类业务(账户加钱)非常实用,但是对支出类业务(账户减钱)类来说,有账户透支地风险,因为这里减钱也属于延迟操作了,所以可能会出现余额账户透支的情况。比如下面这种:

账户余额透支

4.延迟操作流水+定时入账+余额缓存

这是基于上面的第三种方案的改进版,可以防止出现余额透支,总体的逻辑也是可以分为两个部分:

定时处理延迟流水

三.实现案例

  1. 个人账户(非热点账户)使用乐观锁更新
if (savingAccountDO.getAccountType() == SavingAccountTypeEnum.PERSONAL.getValue()) {
                log.info("[更新理财账户余额]个人账户,使用乐观锁处理");
                long startTime = System.currentTimeMillis();

                //更新成功则结束循环,更新失败则继续重试,直到成功或超时返回失败
                for (int i = 0; ; i++) {
                    if (i > 0) {
                        savingAccountDO = savingAccountDOMapper.selectByPrimaryKey(savingAccountTableId);
                        log.info("[更新理财账户余额]获取理财账户:savingAccountDO={}", savingAccountDO);
                    }

                    UpdateAccountBalanceDbReq dbReq = new UpdateAccountBalanceDbReq();
                    dbReq.setId(savingAccountTableId);
                    //更新前的值
                    dbReq.setDepositPrincipalBefore(savingAccountDO.getDepositPrincipal());
                    dbReq.setDepositIncomeBefore(savingAccountDO.getDepositIncome());
                    dbReq.setToBeWithdrawnBefore(savingAccountDO.getToBeWithdrawn());
                    dbReq.setTotalEarningsBefore(savingAccountDO.getTotalEarnings());
                    dbReq.setBalanceVersionBefore(savingAccountDO.getBalanceVersion());

                    //更新后的值
                    long depositPrincipalAfter = savingAccountDO.getDepositPrincipal() + depositPrincipalUpdate;
                    long depositIncomeAfter = savingAccountDO.getDepositIncome() + depositIncomeUpdate;
                    long toBeWithdrawnAfter = savingAccountDO.getToBeWithdrawn() + toBeWithdrawnUpdate;
                    long totalEarningsAfter = savingAccountDO.getTotalEarnings() + totalEarningsUpdate;
                    int balanceVersionAfter = (savingAccountDO.getBalanceVersion() + 1) % 1000;
                    if (depositPrincipalAfter < 0 || depositIncomeAfter < 0
                            || toBeWithdrawnAfter < 0 || totalEarningsAfter < 0) {
                        //更新后的余额不能是负数
                        throw new ServiceException(SavingRespCode.CANNOT_BE_NEGATIVE);
                    }
                    dbReq.setDepositPrincipalAfter(depositPrincipalAfter);
                    dbReq.setDepositIncomeAfter(depositIncomeAfter);
                    dbReq.setToBeWithdrawnAfter(toBeWithdrawnAfter);
                    dbReq.setTotalEarningsAfter(totalEarningsAfter);
                    dbReq.setBalanceVersionAfter(balanceVersionAfter);

                    int result = savingAccountDOMapper.updateBalance(dbReq);
                    log.info("[更新理财账户余额]更新余额:dbReq={}, result={}", dbReq, result);
                    if (result == 1) {
                        //更新余额成功
                        //添加理财账户流水明细
                        addFlowDetail(flowId, flowTypeEnum, req, dbReq, savingAccountDO);
                        break;

                    } else if (System.currentTimeMillis() - startTime > 5000) {
                        log.warn("[更新理财账户余额]更新余额失败,重试超过5秒,返回失败:dbReq={}, result={}", dbReq, result);
                        throw new ServiceException(SavingRespCode.UPDATE_BALANCE_FAIL);
                    }
                }

            }
  1. 热点账户处理
log.info("[更新理财账户余额]更新一个账户的缓存开始:updateCacheReq={}", updateCacheReq);
String accountId = updateCacheReq.getAccountId();
Long amount = updateCacheReq.getAmount();
Long delayId = updateCacheReq.getDelayId();
if (amount < 0) {
    //扣减缓存余额,返回的是该账户扣减后的缓存余额,没有key则初始化刷新数据库的到缓存再扣减
    Long cacheBalance = increaseCacheBalance(accountId, amount);
    if (cacheBalance < 0) {
        //发送钉钉预警消息
        String alertMsg = String.format("[更新理财账户余额]缓存余额<0,更新失败:updateCacheReq=%s, cacheBalance=%s", updateCacheReq, cacheBalance);
        developMonitor.sendAlertMsg(alertMsg);
        log.info(alertMsg);

        //更新后的余额不能是负数
        throw new ServiceException(SavingRespCode.CANNOT_BE_NEGATIVE);
    }
}

//设置缓存操作记录
setCacheDelayId(accountId, delayId);
log.info("[更新理财账户余额]更新一个账户的缓存结束");
  1. 定时任务处理逻辑
try {
    log.info("[延迟计算余额-执行业务]开始:accountId={}", accountId);
    //延迟计算余额
    Long cacheBalance = savingAccountService.delayUpdateBalance(accountId);
    if (cacheBalance < 0) {
        //延迟刷新缓存余额
        cacheBalance = savingAccountService.delayRefreshCacheBalance(accountId);
    }

    SavingAccountDO savingAccountDO = savingAccountDOMapper.selectByAccountId(accountId);
    log.info("[延迟计算余额-执行业务]根据账户id获取理财账户:savingAccountDO={}", savingAccountDO);
    Long tableBalance = savingAccountDO.getDepositPrincipal();
    Long differenceBalance = tableBalance - cacheBalance;

    if (Math.abs(differenceBalance) > savingConfigCenter.getDelayTaskAlertMsgThreshold()) {
        //发送钉钉预警消息
        String alertMsg = String.format("[延迟计算余额的定时任务]执行结束:accountId=%s, tableBalance=%s, cacheBalance=%s, tableBalance-cacheBalance=%s",
                accountId, tableBalance, cacheBalance, differenceBalance);
        taskMonitor.sendAlertMsg(alertMsg);
        log.info(alertMsg);
    }
    log.info("[延迟计算余额-执行业务]结束");

} catch (Exception e) {
    log.error("[延迟计算余额-执行业务]异常", e);

    //发送钉钉预警消息
    String alertMsg = String.format("[延迟计算余额的定时任务]异常:accountId=%s, e=%s", accountId, e);
    taskMonitor.sendAlertMsg(alertMsg);
    log.info(alertMsg);

} finally {
    //更新缓存
    ValueOperations<String, String> valueOperations = redis.opsForValue();
    String isRunningKey = CountryCodeHolder.getCountryCode() + SavingConstant.SAVING_ACCOUNT_DELAY_TASK_IS_RUNNING + accountId;
    String endTimeKey = CountryCodeHolder.getCountryCode() + SavingConstant.SAVING_ACCOUNT_DELAY_TASK_END_TIME + accountId;
    long endTime = System.currentTimeMillis();

    valueOperations.set(isRunningKey, "0", SavingConstant.DELAY_TASK_EXPIRE, TimeUnit.MINUTES);
    valueOperations.set(endTimeKey, String.valueOf(endTime), SavingConstant.DELAY_TASK_EXPIRE, TimeUnit.MINUTES);
    log.info("[延迟计算余额-执行业务]更新缓存:accountId={}, isRunning=0, endTime={}", accountId, endTime);
}



    @Transactional(rollbackFor = Exception.class)
    public Long delayUpdateBalance(String accountId) {
        log.info("[延迟计算余额]开始:accountId={}", accountId);

        //处理延迟流水并更新账户余额
        Long increaseBalance = processDelayFlow(accountId);

        Long cacheBalance;
        if (increaseBalance > 0) {
            //增加缓存余额
            cacheBalance = increaseCacheBalance(accountId, increaseBalance);
        } else {
            //获取缓存余额
            cacheBalance = getCacheBalance(accountId);
        }
        log.info("[延迟计算余额]结束:cacheBalance={}", cacheBalance);
        return cacheBalance;
    }


  private Long processDelayFlow(String accountId) {
        log.info("[处理延迟流水并更新账户余额]开始:accountId={}", accountId);

        SavingAccountDO savingAccountDO = savingAccountDOMapper.selectByAccountId(accountId);
        log.info("[处理延迟流水并更新账户余额]根据账户id获取理财账户:savingAccountDO={}", savingAccountDO);
        Long lastDelayId = savingAccountDO.getLastDelayId();

        //获取缓存操作记录
        Long cacheDelayId = getCacheDelayId(accountId);
        if (cacheDelayId == null) {
            cacheDelayId = lastDelayId + savingConfigCenter.getQueryDelayIdExtension();
        }

        //查询id的范围需要提前、延后,防止上一次处理后插入的id(cacheDelayId)小于当前最大的id
        long startIndex = lastDelayId - savingConfigCenter.getQueryDelayIdExtension();
        long endIndex = cacheDelayId;
        int pageSize = savingConfigCenter.getQueryDelayPageSize();
        List<AccountDelayFlowDO> list;
        List<Long> delayIdList = new ArrayList<>();
        int count = 0;
        long updateBalance = 0;//总的更新余额
        long increaseBalance = 0;//增加的余额
        do {
            list = accountDelayFlowDOMapper.selectNotEntry(accountId, startIndex, endIndex, pageSize);
            log.info("[处理延迟流水并更新账户余额]获取未入账的延迟流水:accountId={}, startIndex={}, endIndex={}, pageSize={}, list.size()={}",
                    accountId, startIndex, endIndex, pageSize, list.size());

            for (AccountDelayFlowDO accountDelayFlowDO : list) {
                delayIdList.add(accountDelayFlowDO.getId());
                Integer flowType = accountDelayFlowDO.getFlowType();
                Long amount = accountDelayFlowDO.getAmount();

                if (flowType == DelayFlowTypeEnum.INCREASE_BALANCE.getValue()) {
                    //增加余额
                    updateBalance += amount;
                    increaseBalance += amount;
                } else {
                    //扣减余额
                    updateBalance -= amount;
                }
            }
            if (list.size() > 0) {
                startIndex = list.get(list.size() - 1).getId();
            }
            count += list.size();
        } while (list.size() == pageSize);
        log.info("[处理延迟流水并更新账户余额]查询延迟流水结束:count={}, updateBalance={}, increaseBalance={}, delayIdList={}",
                count, updateBalance, increaseBalance, delayIdList);

        if (count > 0) {
            //更新延迟流水状态
            for (int i = 0; i < delayIdList.size(); i += savingConfigCenter.getQueryDelayBatchUpdate()) {
                int j = Math.min(i + savingConfigCenter.getQueryDelayBatchUpdate(), delayIdList.size());
                List<Long> subList = delayIdList.subList(i, j);

                int result = accountDelayFlowDOMapper.updateIsEntry(subList);
                log.info("[处理延迟流水并更新账户余额]批量更新为已入账:subList={}, result={}", subList, result);
                if (result != subList.size()) {
                    throw new ServiceException(SavingRespCode.SAVE_DATA_FAIL);
                }
            }

            //更新账户余额
            savingAccountDO = savingAccountDOMapper.selectForUpdateByAccountId(accountId);
            log.info("[处理延迟流水并更新账户余额]根据账户id获取理财账户,加锁:savingAccountDO={}", savingAccountDO);

            Long id = savingAccountDO.getId();
            Long depositPrincipalBefore = savingAccountDO.getDepositPrincipal();
            Long depositPrincipalAfter = depositPrincipalBefore + updateBalance;
            lastDelayId = startIndex;
            if (depositPrincipalAfter < 0) {
                //更新后的余额不能是负数
                throw new ServiceException(SavingRespCode.CANNOT_BE_NEGATIVE);
            }

            int result = savingAccountDOMapper.updateDepositPrincipal(id, depositPrincipalBefore, depositPrincipalAfter, lastDelayId);
            log.info("[处理延迟流水并更新账户余额]更新存款本金:id={}, depositPrincipalBefore={}, depositPrincipalAfter={}, lastDelayId={}, result={}",
                    id, depositPrincipalBefore, depositPrincipalAfter, lastDelayId, result);
            if (result == 0) {
                throw new ServiceException(SavingRespCode.SAVE_DATA_FAIL);
            }
        }

        log.info("[处理延迟流水并更新账户余额]结束:increaseBalance={}", increaseBalance);
        return increaseBalance;
    }


    @Transactional(rollbackFor = Exception.class)
    public Long delayRefreshCacheBalance(String accountId) {
        log.info("[延迟刷新缓存余额]开始:accountId={}", accountId);

        //处理延迟流水并更新账户余额
        processDelayFlow(accountId);

        //刷新缓存余额
        Long cacheBalance = refreshCacheBalance(accountId);

        //发送钉钉预警消息
        String alertMsg = String.format("[延迟刷新缓存余额]缓存余额<0,用理财账户表中的余额刷新:accountId=%s", accountId);
        developMonitor.sendAlertMsg(alertMsg);
        log.info(alertMsg);
        log.info("[延迟刷新缓存余额]结束:cacheBalance={}", cacheBalance);
        return cacheBalance;
    }
    public Long refreshCacheBalance(String accountId) {
        log.info("[刷新缓存余额]开始:accountId={}", accountId);
        HashOperations<String, String, String> hashOperations = redis.opsForHash();
        String key = CountryCodeHolder.getCountryCode() + SavingConstant.SAVING_ACCOUNT_CACHE_BALANCE;

        SavingAccountDO savingAccountDO = savingAccountDOMapper.selectByAccountId(accountId);
        log.info("[刷新缓存余额]根据账户id获取理财账户:savingAccountDO={}", savingAccountDO);

        Long cacheBalance = savingAccountDO.getDepositPrincipal();
        hashOperations.put(key, accountId, cacheBalance.toString());
        log.info("[刷新缓存余额]结束:key={}, accountId={}, cacheBalance={}", key, accountId, cacheBalance);
        return cacheBalance;
    }

参考文章

热点账户博客1 热点账户博客2 热点账户博客3