当前位置: 移动技术网 > IT编程>开发语言>Java > spring cloud alibaba源码解析:Nacos配置更新

spring cloud alibaba源码解析:Nacos配置更新

2020年07月31日  | 移动技术网IT编程  | 我要评论

配置更新代码框架:

代码位置:nacos-client-1.2.1.jar包里面ClientWorker这个类。我这里分析的版本是1.2.1。

当Nacos server上的配置更新的时候,nacos客户端就会去拉取新的配置。原理是:客户端会用定时器定时去拉取配置,找出发生变化的配置,然后更新到本地缓存中。

定时器相关代码如下:

executor.scheduleWithFixedDelay(new Runnable() {
    @Override
    public void run() {
        try {
            checkConfigInfo();
        } catch (Throwable e) {
            LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
        }
    }
}, 1L, 10L, TimeUnit.MILLISECONDS);

定时器采用scheduleWithFixedDelay来实现,定时调用checkConfigInfo函数来主动拉取配置,距离上次调用运行完后10毫秒启动下次调用。

public void checkConfigInfo() {
    // 分任务
    int listenerSize = cacheMap.get().size();
    // 向上取整为批数
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // 要判断任务是否在执行 这块需要好好想想。 任务列表现在是无序的。变化过程可能有问题
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

这个地方,根据listener的个数(配置文件个数+1,当前我测试的情况是这样的。具体代码后面再研究一下。)来分任务,每个任务(线程)默认负责3000个listener。
这里用另一个定时器执行了LongPollingRunnable任务。

executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
        t.setDaemon(true);
        return t;
    }
});

现在来看一下LongPollingRunnable的实现,LongPollingRunnable是一个Runnable,run方法如下:

@Override
public void run() {

    List<CacheData> cacheDatas = new ArrayList<CacheData>();
    List<String> inInitializingCacheList = new ArrayList<String>();
    try {
        // check failover config
        for (CacheData cacheData : cacheMap.get().values()) {
            if (cacheData.getTaskId() == taskId) {
                cacheDatas.add(cacheData);
                try {
                    checkLocalConfig(cacheData);
                    if (cacheData.isUseLocalConfigInfo()) {
                        cacheData.checkListenerMd5();
                    }
                } catch (Exception e) {
                    LOGGER.error("get local config info error", e);
                }
            }
        }

        // check server config
        List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
        LOGGER.info("get changedGroupKeys:" + changedGroupKeys);

        for (String groupKey : changedGroupKeys) {
            String[] key = GroupKey.parseKey(groupKey);
            String dataId = key[0];
            String group = key[1];
            String tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                cache.setContent(ct[0]);
                if (null != ct[1]) {
                    cache.setType(ct[1]);
                }
                LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                    ContentUtils.truncateContent(ct[0]), ct[1]);
            } catch (NacosException ioe) {
                String message = String.format(
                    "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                    agent.getName(), dataId, group, tenant);
                LOGGER.error(message, ioe);
            }
        }
        for (CacheData cacheData : cacheDatas) {
            if (!cacheData.isInitializing() || inInitializingCacheList
                .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                cacheData.checkListenerMd5();
                cacheData.setInitializing(false);
            }
        }
        inInitializingCacheList.clear();

        executorService.execute(this);

    } catch (Throwable e) {

        // If the rotation training task is abnormal, the next execution time of the task will be punished
        LOGGER.error("longPolling error : ", e);
        executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
    }
}

定时读取Nacos Server的配置,如果读取的配置没有发生变化,则等待30s后,返回,执行下一次循环。
如果有更新立即返回。
checkUpdateDataIds获取更新后的group key,形式是dataId+group+tenant的形式。
checkUpdateDataIds方法里面,

long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
    agent.getEncode(), readTimeoutMs);

post请求路径是:/v1/cs/configs/listener
根据阿里接口文档,知道该接口是监听配置接口。
在这里插入图片描述
getServerConfig方法根据变化的groupkey列表,获取配置内容,使用Open-api的/v1/cs/configs获取配置接口。然后更新本地缓存。

String[] ct = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
    cache.setType(ct[1]);
}

checkListenerMd5方法,更新listener的md5和content内容,然后发送refresh事件通知,来更新类中的配置字段。

NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));

执行下一次循环:

executorService.execute(this); 

出错的时候,延时taskPenaltyTime ms再执行下一次循环:

executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);

本文地址:https://blog.csdn.net/songhongjin/article/details/107662632

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网