nacos配置中心源码解析

/ 默认分类 / 0 条评论 / 937浏览

官方文档地址

一. Nacos的openapi

官方给出的开放接口包含下面几个:

另外需要理解好,,官方给出的这句解释:

监听 Nacos 上的配置,以便实时感知配置变更。如果配置变更,则用获取配置接口获取配置的最新值,动态刷新本地缓存。注册监听采用的是异步 Servlet 技术。注册监听本质就是带着配置和配置值的 MD5 值和后台对比。如果 MD5 值不一致,就立即返回不一致的配置。如果值一致,就等待住 30 秒。返回值为空。

二.客户端源码分析

  1. 自动配置
	@Bean
	public NacosConfigManager nacosConfigManager(
			NacosConfigProperties nacosConfigProperties) {
		return new NacosConfigManager(nacosConfigProperties);
	}
	
@ConfigurationProperties(NacosConfigProperties.PREFIX)
public class NacosConfigProperties {

	/**
	 * Prefix of {@link NacosConfigProperties}.
	 */
	public static final String PREFIX = "spring.cloud.nacos.config";
  1. NacosConfigManager(Nacos配置管理器)
public class NacosConfigManager {

	private static final Logger log = LoggerFactory.getLogger(NacosConfigManager.class);

	private static ConfigService service = null;

	private NacosConfigProperties nacosConfigProperties;

	public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {
		this.nacosConfigProperties = nacosConfigProperties;
		// Compatible with older code in NacosConfigProperties,It will be deleted in the
		// future.
		//创建与配置相关的服务
		createConfigService(nacosConfigProperties);
	}
  1. 创建配置服务
	static ConfigService createConfigService(
			NacosConfigProperties nacosConfigProperties) {
		if (Objects.isNull(service)) {
			synchronized (NacosConfigManager.class) {
				try {
					if (Objects.isNull(service)) {
					    //创建
						service = NacosFactory.createConfigService(
								nacosConfigProperties.assembleConfigServiceProperties());
					}
				}
				catch (NacosException e) {
					log.error(e.getMessage());
					throw new NacosConnectionFailureException(
							nacosConfigProperties.getServerAddr(), e.getMessage(), e);
				}
			}
		}
		return service;
	}
public class NacosFactory {

    public static ConfigService createConfigService(Properties properties) throws NacosException {
        return ConfigFactory.createConfigService(properties);
    }
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            //最终通过反射创建Nacos配置服务
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

看下这个类:

public class NacosConfigService implements ConfigService {

    private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class);

    private static final long POST_TIMEOUT = 3000L;

    private static final String EMPTY = "";

    /**
     * http agent
     */
    private HttpAgent agent;
    /**
     * longpolling
     */
    private ClientWorker worker;
    private String namespace;
    private String encode;
    private ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager();

    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        agent.start();
        //重点在这,创建一个Nacos客户端作用对象
        worker = new ClientWorker(agent, configFilterChainManager, properties);
    }
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;

        // Initialize the timeout parameter

        init(properties);
        
        //
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });

        executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        //创建一个延迟任务线程池,每隔10ms检查配置信息
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

//配置数据量过多,这里进行分组之后交给LongPollingRunnable处理,检测配置是否更新 
    public void checkConfigInfo() {
        // 分任务
        int listenerSize = cacheMap.get().size();
        // 向上取整为批数
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

可以看到executorService.execute(new LongPollingRunnable(i));这里LongPollingRunnable是通过长轮询来检查配置,客户端的配置更新主要来看 这个类:

class LongPollingRunnable implements Runnable {
        private int taskId;

        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }

        @Override
        public void run() {

            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }

                // check server config
                //检查配置是否更改,按照openapi的描述,如果配置更改了,则会返回更改的配置列表的位置信息
                List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                
                //传过去的也是一次检测多个配置,所以返回的可能也是有多个配置被更改了,下面遍历
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        //openapi返回的不是空串,代表这个dataid的配置是已经更新了的,所以这里去获取配置中心改文件的最新内容,这个方法也是调用了openapi获取配置
                        String content = getServerConfig(dataId, group, tenant, 3000L);
                        CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        //将更新的配置更新到本地缓存
                        cache.setContent(content);
                        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(content));
                    } catch (NacosException ioe) {
                        String message = String.format(
                            "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                            agent.getName(), dataId, group, tenant);
                        LOGGER.error(message, ioe);
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();

                executorService.execute(this);

            } catch (Throwable e) {

                // If the rotation training task is abnormal, the next execution time of the task will be punished
                LOGGER.error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }
    List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException {
        //sb是按照openapi中请求监听配置是否更改需要的请求参数,这里一个for循环进行参数的拼接组装
        StringBuilder sb = new StringBuilder();
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isUseLocalConfigInfo()) {
                sb.append(cacheData.dataId).append(WORD_SEPARATOR);
                sb.append(cacheData.group).append(WORD_SEPARATOR);
                if (StringUtils.isBlank(cacheData.tenant)) {
                    sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
                } else {
                    sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                    sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
                }
                if (cacheData.isInitializing()) {
                    // cacheData 首次出现在cacheMap中&首次check更新
                    inInitializingCacheList
                        .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
                }
            }
        }
        boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
        //检查配置是否更新
        return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
    }
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {

        List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);

        List<String> headers = new ArrayList<String>(2);
        headers.add("Long-Pulling-Timeout");
        headers.add("" + timeout);

        // told server do not hang me up if new initializing cacheData added in
        if (isInitializingCacheList) {
            headers.add("Long-Pulling-Timeout-No-Hangup");
            headers.add("true");
        }

        if (StringUtils.isBlank(probeUpdateString)) {
            return Collections.emptyList();
        }

        try {
            //调用监听配置的openapi
            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                agent.getEncode(), timeout);

            if (HttpURLConnection.HTTP_OK == result.code) {
                setHealthServer(true);
                return parseUpdateDataIdResponse(result.content);
            } else {
                setHealthServer(false);
                LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code);
            }
        } catch (IOException e) {
            setHealthServer(false);
            LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
            throw e;
        }
        return Collections.emptyList();
    }

客户端配置更新总结

从上面的分析可以看出,LongPollingRunnable会在线程池中每10ms执行一次,也就是说nacos客户端会每10ms带着本地的最新的缓存配置去请求服务端, 检查配置是否更新了(注意这里是假设可以忽略上一个任务执行的耗时),如果更新了就请求最新的配置,

        //创建一个延迟任务线程池,每隔10ms检查配置信息
        executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);

上面即为之前分析的创建的更新配置的任务线程池,这里需要搞清楚,在juc中的ScheduledExecutorService中的scheduleWithFixedDelay方法中的delay参数为延时时间, 表示的是上一次任务执行结束后开始算起到下一次任务开始执行的间隔时间.

好了看上去就是这样,但是这里大家应该注意到一个问题

            //调用监听配置的openapi
            HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
                agent.getEncode(), timeout);

在调用监听配置的接口的时候,有一个http响应超时时间,这个timeout的值默认是30s,所以如果执行到这段代码后,如果服务端hold这个请求,那么30s后 客户端才会解除阻塞,获得到result,那么也就是任务线程池基本上是30s执行一次检查配置的,如果每个请求都是被挂起的
问题来了,为什么要设置超时时间这么长呢?到底客户端是怎样更新配置,接受配置更新的消息的呢?是客户端拉取还是服务端推送呢?
解答这个问题就需要继续了解nacos服务端的配置监听接口的实现了

服务端配置监听接口源码分析

之前我们分析了nacos服务端的服务注册发现的openapi,现在分析配置中心的服务端openapi会简单很多,首先找到ConfigController

@PostMapping("/listener")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public void listener(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        
        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
        
        Map<String, String> clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        
        // do long-polling
        //可以看到,这里会使用长轮询来处理请求
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }

所以这就能解释,为什么前面客户端为什么要设置超时时间为30s了,因为服务端的处理使用到长轮询.

ps:长轮询?

客户端发送请求后,服务端不会立刻响应,服务端收到请求后会先hold挂起,期间如果服务端没有需要传递给客户端的信息,那么经过 客户端请求时带的超时时间之后,无论有没有需要的信息都会自动响应给客户端,本次长轮询结束;同样的,如果期间服务端有需要的信息传递给客户端那么会 立刻停止挂起,将信息响应给客户端,本次长轮询结束;

相比于长轮询,我们应该还听过长连接短链接,是的,这是tcp中的了,可以参考下面的文章:

https://blog.csdn.net/pmt123456/article/details/58233999?utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control
https://tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO/
https://www.jianshu.com/p/3fc3646fad80
https://segmentfault.com/a/1190000024430529  


继上面的长轮询处理方法进入后,最终的执行逻辑如下:

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {
        
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        
        // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
        //这里可以看到,最终在服务端使用的超时时长是客户端时间减去500ms,这是为了提前返回响应,防止网络传输原因导致客户端超时报错   
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // Do nothing but set fix polling timeout.
        } else {
            long start = System.currentTimeMillis();
            //比较配置文件的md5值,判断是否发生变化   
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            //如果存在更新的配置文件,那么直接返回
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            }
            //如果设置了客户端拉取时服务端不挂起为true,那么就算没有更新,那么服务端也不执行长轮询,而是直接返回
            else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        
        // Must be called by http thread, or send response.
        final AsyncContext asyncContext = req.startAsync();
        
        // AsyncContext.setTimeout() is incorrect, Control by oneself
        asyncContext.setTimeout(0L);
        
        //将需要执行长轮询的请求封装为名为ClientLongPolling的任务交由线程池执行  
        ConfigExecutor.executeLongPolling(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

所以最后处理长轮询请求一定会执行ClientLongPolling中线程执行逻辑

@Override
        public void run() {
            asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
                @Override
                public void run() {
                    try {
                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                        
                        // Delete subsciber's relations.
                        allSubs.remove(ClientLongPolling.this);
                        
                        if (isFixedPolling()) {
                            LogUtil.CLIENT_LOG
                                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                            "polling", clientMd5Map.size(), probeRequestSize);
                            List<String> changedGroups = MD5Util
                                    .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                            (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                            //配置更改了
                            if (changedGroups.size() > 0) {
                                sendResponse(changedGroups);
                            } else {
                                sendResponse(null);
                            }
                        }
                        //超时
                        else {
                            LogUtil.CLIENT_LOG
                                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                            "polling", clientMd5Map.size(), probeRequestSize);
                            sendResponse(null);
                        }
                    } catch (Throwable t) {
                        LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                    }
                    
                }
                //可以看到,上面地run执行逻辑是在延迟timeout这么长时间之后才执行
            }, timeoutTime, TimeUnit.MILLISECONDS);
            
            allSubs.add(this);
        }

好的,好戏上场了,timeout时间之后才执行run,那么也就是说,必须等到29.5秒之后才会执行检测配置是否改变的逻辑,那么问题来了,那这样岂不是 代表不管长轮询期间是否有配置修改,都要等到29.5秒后才会通知到客户端,这显然不是长轮询的目的,长轮询是为了一旦在长轮询期间有配置发生了 变化,此时就算没有达到超时时间,也会立马响应给客户端,哈哈,其实这里也就是配置更新后,服务端立刻通知客户端,即push操作;下面我们来看下这里是 如何实现的:

我们发现,上面的方法中有一个变量allSubs,源码中的注释是这样的:

    /**
     * ClientLongPolling subscibers.
     * ClientLongPolling订阅了某个事件,这个队列保存的是所有的订阅者
     */
    final Queue<ClientLongPolling> allSubs;

也就是说,allSubs关联了某种订阅关系

public LongPollingService() {
        //这里初始化了allSubs
        allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
        
        ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
        
        // Register LocalDataChangeEvent to NotifyCenter.
        NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
        
        // Register A Subscriber to subscribe LocalDataChangeEvent.
        NotifyCenter.registerSubscriber(new Subscriber() {
            
            //为订阅者ClientLongPolling绑定了LocalDataChangeEvent事件(配置改变事件),一旦出现这个事件就会执行下面的DataChangeTask任务,所以会执行其run方法   
            @Override
            public void onEvent(Event event) {
                if (isFixedPolling()) {
                    // Ignore.
                } else {
                    if (event instanceof LocalDataChangeEvent) {
                        LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
                        ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
                    }
                }
            }
            
            @Override
            public Class<? extends Event> subscribeType() {
                return LocalDataChangeEvent.class;
            }
        });
        
    }

DataChangeTask中的run方法如下,主要就是收到配置更改的事件通知后会立刻执行返回当前长轮询,返回配置变化信息

 @Override
        public void run() {
            try {
                ConfigCacheService.getContentBetaMd5(groupKey);
                //这里使用的是for循环迭代器,遍历每一个加入长轮询队列中的订阅者
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // If published tag is not in the beta list, then it skipped.
                        if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
                            continue;
                        }
                        
                        // If published tag is not in the tag list, then it skipped.
                        if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                            continue;
                        }
                        
                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        iter.remove(); // Delete subscribers' relationships.
                        LogUtil.CLIENT_LOG
                                .info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
                                        RequestUtil
                                                .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
                                        "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                        //响应配置变化信息
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
                LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
            }
        }

好的,那么问题又来了,发布者是怎样发布事件的呢?在哪里发布的配置修改事件呢?其实仔细想想不难得出,应该是在修改配置的方法逻辑中添加了一个通知订阅者的逻辑。首先nacos也是属于客户端服务端架构,在发布更新配置的时候,其实也是客户端 操作调用服务端的openapi来实现的,也就是官方给出的发布配置的openapi,文章最开始有介绍,按照请求url和method我们在ConfigController中找到了该请求处理方法:

@PostMapping
    @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
            @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "src_user", required = false) String srcUser,
            @RequestParam(value = "config_tags", required = false) String configTags,
            @RequestParam(value = "desc", required = false) String desc,
            @RequestParam(value = "use", required = false) String use,
            @RequestParam(value = "effect", required = false) String effect,
            @RequestParam(value = "type", required = false) String type,
            @RequestParam(value = "schema", required = false) String schema) throws NacosException {
        
        //常规的请求参数处理工作
        final String srcIp = RequestUtil.getRemoteIp(request);
        final String requestIpApp = RequestUtil.getAppName(request);
        srcUser = RequestUtil.getSrcUserName(request);
        //check type,获取修改的配置文件的类型,type参数未传或者非法则默认为text文本类型
        if (!ConfigType.isValidType(type)) {
            type = ConfigType.getDefaultType().getType();
        }
        // check tenant
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);
        Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
        MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
        MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
        MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
        MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
        MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
        MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
        ParamUtils.checkParam(configAdvanceInfo);
        
        if (AggrWhitelist.isAggrDataId(dataId)) {
            LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
                    dataId, group);
            throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
        }
        
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        //如果是正式发布
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
                //更新后通知订阅者
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
                //更新后通知订阅者
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        }
        //如果是beta测试发布
        else {
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
            //更新后通知订阅者
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService
                .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                        ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }
public class ConfigChangePublisher {
    
    /**
     * Notify ConfigChange.
     *
     * @param event ConfigDataChangeEvent instance.
     */
    public static void notifyConfigChange(ConfigDataChangeEvent event) {
        if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {
            return;
        }
        //通知中心发布事件
        NotifyCenter.publishEvent(event);
    }
    
}


    public static boolean publishEvent(final Event event) {
        try {
            return publishEvent(event.getClass(), event);
        } catch (Throwable ex) {
            LOGGER.error("There was an exception to the message publishing : {}", ex);
            return false;
        }
    }
    
    private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
            return INSTANCE.sharePublisher.publish(event);
        }

        final String topic = ClassUtils.getCanonicalName(eventType);

        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            //发布事件
            return publisher.publish(event);
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
    }
//发布者将事件
    public boolean publish(Event event) {
        checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
            receiveEvent(event);
            return true;
        }
        return true;
    }

这里就是一个典型的发布订阅模式,配置修改后发布者将配置改变事件推到队列,订阅者执行前面的DataChangeTask中的run方法

综上,Nacos中配置的修改监听大致如下:

1.Nacos服务端提供了操作配置中心相关的openapi 2.Nacos客户端启动后,会从配置中心获取配置,然后将配置缓存在本地,然后启动一个延时时间为10ms的任务线程池,不断地请求监听配置地openapi,这个操作即为主动pull拉取最新配置 3.但是单独使用这样的客户端轮询拉取配置会因为请求太过频繁导致服务器压力上升,所以实际上Nacos客户端拉取配置使用的是长轮询,在pull请求的时候设置的超时时间为30s(默认) 4.因为是长轮询,所以服务端也需要支持请求hold,并且在hold期间如果有配置修改需要及时处理当前hold的请求,所以服务端在处理长轮询的时候还加入了发布订阅的支持,一旦有ConfigDataChangeEvent发生 就处理hold的请求并返回响应