当前位置: 移动技术网 > IT编程>开发语言>Java > Nacos配置服务原理

Nacos配置服务原理

2019年09月17日  | 移动技术网IT编程  | 我要评论
Nacos Client配置机制 spring加载远程配置 在了解NACOS客户端配置之前,我们先看看spring怎么样加载远程配置的。spring 提供了加载远程配置的扩展接口 PropertySourceLocator。下面看个简单的例子: 实现PropertySourceLocator Pro ...

nacos client配置机制

spring加载远程配置

在了解nacos客户端配置之前,我们先看看spring怎么样加载远程配置的。spring 提供了加载远程配置的扩展接口 propertysourcelocator。下面看个简单的例子:

实现propertysourcelocator

public class greizpropertysourcelocator implements propertysourcelocator {
    @override
    public propertysource<?> locate(environment environment) {
        // 自定义配置,来源可以从任何地方
        map<string, object> source = new hashmap<>();
        source.put("username", "greiz");
        source.put("userage", 18);
        return new mappropertysource(greizpropertysource.property_name, source);
    }
}

propertysourcelocator 只有一个接口,我们可以在该接口实现自定义配置的加载,比如从数据库中获取配置,或者文件中获取配置等。

springboot启动配置类

@configuration
public class greizconfigbootstrapconfiguration {
    @bean
    public greizpropertysourcelocator greizpropertysourcelocator() {
        return new greizpropertysourcelocator();
    }
}

在meta-inf/spring.factories添加启动指定加载类

org.springframework.cloud.bootstrap.bootstrapconfiguration=\
com.greiz.demo.config.greizconfigbootstrapconfiguration

使用

@component
public class greiz {
    @value("${username}")
    private string name;
    @value("${userage}")
    private integer age;
      // 省getter/setter
}

跟本地配置一样使用。

spring启动加载远程配置流程

spring启动加载流程

在spring启动preparecontext阶段会执行propertysourcelocator所有实现类加载自定义的配置,最终添加到environment中管理。

nacos-client

拉取远程配置

nacos客户端启动时加载远程配置就是用了上面的方式。下面我们根据源码看一下具体过程。nacospropertysourcelocator 实现了 propertysourcelocator,所以spring启动时会调用locate方法。

public propertysource<?> locate(environment env) {
   // 1. 创建一个跟远程打交道的对象nacosconfigservice
   configservice configservice = nacosconfigproperties.configserviceinstance();
   ... 省略代码
   // 2. 操作nacospropertysource对象,下面三个方法最终都会调用该对象build
   nacospropertysourcebuilder = new nacospropertysourcebuilder(configservice, timeout);
   // 3. 
   string name = nacosconfigproperties.getname();
   string dataidprefix = nacosconfigproperties.getprefix();
   if (stringutils.isempty(dataidprefix)) {
      dataidprefix = name;
   }
   if (stringutils.isempty(dataidprefix)) {
      dataidprefix = env.getproperty("spring.application.name");
   }
   // 从远程获取的properties会存放到该类,最终放到environment中
   compositepropertysource composite = new compositepropertysource(nacos_property_source_name);
   // 加载公共模块配置
   loadsharedconfiguration(composite);
   // 加载扩展配置
   loadextconfiguration(composite);
   // 加载独有配置
   loadapplicationconfiguration(composite, dataidprefix, nacosconfigproperties, env);
   return composite;
}

1处 - 创建 configservice 对象,是通过反射创建出 nacosconfigservice 实例。该类是nacos client 跟 nacos server 重要的对接者。后面会围绕该类细讲。

2处 - 创建 nacospropertysourcebuilder 实例,用于构建和缓存 nacospropertysource,刷新时会用到此处缓存。

3处 - 加载配置的顺序,公共配置 -> 扩展配置 -> 私有配置,如果有相同key的后面的覆盖前面的。默认的 data id 生成规则 ${spring.application.name}.properties。

加载三种配置最终都会调用 nacospropertysourcebuilder.build() 方法。

nacospropertysource build(string dataid, string group, string fileextension, boolean isrefreshable) {
   // 加载配置
   properties p = loadnacosdata(dataid, group, fileextension);
   nacospropertysource nacospropertysource = new nacospropertysource(group, dataid, propertiestomap(p), new date(), isrefreshable);
   // 缓存nacospropertysource
   nacospropertysourcerepository.collectnacospropertysources(nacospropertysource);
   return nacospropertysource;
}

加载配置后封装nacospropertysource,并缓存。

主要逻辑在 nacospropertysourcebuilder.loadnacosdata() 中。

private properties loadnacosdata(string dataid, string group, string fileextension) {
    // 获取配置
    string data = configservice.getconfig(dataid, group, timeout);
    ... 省略代码
    // .properties扩展名
    if (fileextension.equalsignorecase("properties")) {
        properties properties = new properties();
        properties.load(new stringreader(data));
        return properties;
    } else if (fileextension.equalsignorecase("yaml") || fileextension.equalsignorecase("yml"))         {// .yaml或者.yml扩展名
      yamlpropertiesfactorybean yamlfactory = new yamlpropertiesfactorybean();
      yamlfactory.setresources(new bytearrayresource(data.getbytes()));
      return yamlfactory.getobject();
     }
   return empty_properties;
}

把远程获取到的数据根据扩展名解析成统一的properties。nacos控制台配置支持properties和yaml两个扩展名。

真正获取远程配置的是 nacosconfigservice.getconfig(), 调用getconfiginner()。

private string getconfiginner(string tenant, string dataid, string group, long timeoutms) throws nacosexception {
    group = null2defaultgroup(group);
    paramutils.checkkeyparam(dataid, group);
    configresponse cr = new configresponse();
    cr.setdataid(dataid);
    cr.settenant(tenant);
    cr.setgroup(group);

    // 1. 优先使用failvoer配置
    string content = localconfiginfoprocessor.getfailover(agent.getname(), dataid, group, tenant);
    if (content != null) {
        cr.setcontent(content);
        configfilterchainmanager.dofilter(null, cr);
        content = cr.getcontent();
        return content;
    }

    try {
        // 2. 服务器获取配置
        content = worker.getserverconfig(dataid, group, tenant, timeoutms);
        cr.setcontent(content);
        configfilterchainmanager.dofilter(null, cr);
        content = cr.getcontent();
        return content;
    } catch (nacosexception ioe) {
        if (nacosexception.no_right == ioe.geterrcode()) {
            throw ioe;
        }
    }

    // 3. 当服务器挂了就拿本地快照
    content = localconfiginfoprocessor.getsnapshot(agent.getname(), dataid, group, tenant);
    cr.setcontent(content);
    configfilterchainmanager.dofilter(null, cr);
    content = cr.getcontent();
    return content;
}

1处 - 优先从failvoer获取配置,该文件是怎么样产生的,我暂时还不是很清楚,后面搞懂补充。

2处 - 从nacos服务中获取配置。

3处 - 如果2失败了就从本地快照文件获取。该文件由首次读取远程配置文件生成,并且之后轮询配置更新时如果有更新也会对应更新该文件。

访问服务接口的脏活当然需要一个客户端工作者clientworker,下面是 nacosconfigservice.getconfig() 中调用 clientworker.getserverconfig()。

public string getserverconfig(string dataid, string group, string tenant, long readtimeout)
    throws nacosexception {
    // 就是这么简单http请求获取的配置
    httpresult result = agent.httpget(constants.config_controller_path, null, params, agent.getencode(), readtimeout);
  ... 省略代码
    // 写本地文件快照
    localconfiginfoprocessor.savesnapshot(agent.getname(), dataid, group, tenant, result.content);
  ...省略代码
        return result.content;
}

看了上面获取远程配置的代码是不是想喊出f**k,怎么这么简单!!!是的,用http请求 http://ip:port/v1/cs/configs 接口,跟nacos控制台页面访问是一样的。

到此nacos client启动读取远程配置并封装到environment结束了。

长轮询获取更新

前一小节是对项目启动时nacos client加载远程配置过程分析,本节将对项目运行中配置改变了nacos client是怎么样悉知的分析。

前面提到 nacosconfigservice 是 nacos client 对接 nacos server 的桥梁,下面看一下该类在配置更新过程怎么样运作的。先看一下 nacosconfigservice 的构造方法。

public nacosconfigservice(properties properties) throws nacosexception {
    ... 省略代码
    // 初始化 namespace
    initnamespace(properties);
    // 查询服务列表变化情况
    agent = new metricshttpagent(new serverhttpagent(properties));
    agent.start();
    // 配置更新解决方案在这里面
    worker = new clientworker(agent, configfilterchainmanager, properties);
}

在构造函数中初始化 encode、namespace、httpagent 和 clientworker。

httpagent 是通过http获取服务地址列表代理类,维护这服务地址列表和客户端本地一致。

clientworker 是维护服务端配置和客户端配置一致的工作者。前面初始化获取远程配置时也是该对象。

clientworker 内部是怎么样维护客户端属性更新呢?看一下 clientworker 构造函数干了啥。

public clientworker(final httpagent agent, final configfilterchainmanager configfilterchainmanager, final properties properties) {
        ...省略代码
    executor = executors.newscheduledthreadpool(1, new threadfactory() {
        ...省略代码
    });
  
    executorservice = executors.newscheduledthreadpool(runtime.getruntime().availableprocessors(), new threadfactory() {
        ...省略代码
    });

    // 每10毫秒检查一遍配置
    executor.schedulewithfixeddelay(new runnable() {
        @override
        public void run() {
            try {
                checkconfiginfo();
            } catch (throwable e) {
                logger.error("[" + agent.getname() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1l, 10l, timeunit.milliseconds);
}

clientworker 构造函数创建了两个线程池。executor 创建了一个定时任务,每10毫秒执行一次 checkconfiginfo(); executorservice 作用是什么我们接着往下看。

public void checkconfiginfo() {
    // 分任务 向上取整为批数
    int listenersize = cachemap.get().size();
    int longingtaskcount = (int) math.ceil(listenersize / paramutil.getpertaskconfigsize());
    if (longingtaskcount > currentlongingtaskcount) {
        for (int i = (int) currentlongingtaskcount; i < longingtaskcount; i++) {
            executorservice.execute(new longpollingrunnable(i));
        }
        currentlongingtaskcount = longingtaskcount;
    }
}

以分段方式把任务拆分交给 executorservice 执行,默认3000个配置在一个任务中。executor 和 executorservice 是不是很像 netty 中的 boos 和 worker? reactor 模式,分工明确。

longpollingrunnable 是 clientworker 一个成员类,实现 runnable 接口。看一下 run() 方法。

public void run() {
    list<cachedata> cachedatas = new arraylist<cachedata>();
    list<string> ininitializingcachelist = new arraylist<string>();
    try {
        // 1. 只处理该任务中的配置并且检查failover配置
        for (cachedata cachedata : cachemap.get().values()) {
            if (cachedata.gettaskid() == taskid) {
                cachedatas.add(cachedata);
                try {
                    checklocalconfig(cachedata);
                    if (cachedata.isuselocalconfiginfo()) {
                        cachedata.checklistenermd5();
                    }
                } catch (exception e) {
                    logger.error("get local config info error", e);
                }
            }
        }
                // 2. 把客户端的md5值跟服务端的md5比较,把不一样的配置以 "example.properties+default_group"方式返回
        list<string> changedgroupkeys = checkupdatedataids(cachedatas, ininitializingcachelist);
       // 3. 把有更新的配置重新从服务端拉取配置内容
        for (string groupkey : changedgroupkeys) {
            string[] key = groupkey.parsekey(groupkey);
            string dataid = key[0];
            string group = key[1];
            string tenant = null;
            if (key.length == 3) {
                tenant = key[2];
            }
            try {
                string content = getserverconfig(dataid, group, tenant, 3000l);
                cachedata cache = cachemap.get().get(groupkey.getkeytenant(dataid, group, tenant));
                // 修改客户端本地值并且重新计算该对象的md5值
                cache.setcontent(content);
            } catch (nacosexception ioe) {
                ...省略代码
            }
        }
        for (cachedata cachedata : cachedatas) {
            if (!cachedata.isinitializing() || ininitializingcachelist.contains(groupkey.getkeytenant(cachedata.dataid, cachedata.group, cachedata.tenant))) {
                // 4. 根据md5值检查是否更新,如果更新通知listener
                cachedata.checklistenermd5();
                cachedata.setinitializing(false);
            }
        }
        ininitializingcachelist.clear();
        // 5. 又把this放进线程池中,形成一个长轮询检查客户端和服务端配置一致性
        executorservice.execute(this);
    } catch (throwable e) {
        executorservice.schedule(this, taskpenaltytime, timeunit.milliseconds);
    }
}

1处 - 筛选属于该任务的配置,并检查 failover 配置。

2处 - 把配置以"dataid group md5 tenant\r\n"拼接后当做参数请求服务器 http://ip:port/v1/cs/configs/listener 接口。服务器返回有更新的配置,以 "example.properties+default_group"方式返回

3处 - 根据2处返回的列表遍历请求服务器 http://ip:port/v1/cs/configs 接口,获取最新配置。然后更新cachedata content值并更新md5值。

4处 - 把 cachedata 新的md5值跟之前的做比较,如果不一样就通知监听者更新值。下一节会跟进去详解。

5处 - 把该 runnable 对象重新放入线程池,形成一个长轮询。

本节分析了 nacos client 配置是怎么样保持跟服务器接近实时同步的。通过长轮询+http短连接方式。

刷新值

在开始本节之前,我们先看一下上面多次出现的一个类 cachedata 结构。

public class cachedata {
    private final string name;
    private final configfilterchainmanager configfilterchainmanager;
    public final string dataid;
    public final string group;
    public final string tenant;
    // 监听列表
    private final copyonwritearraylist<managerlistenerwrap> listeners;
    // 内容md5值
    private volatile string md5;
    // 是否使用本地配置
    private volatile boolean isuselocalconfig = false;
    // 本地版本号
    private volatile long localconfiglastmodified;
    private volatile string content;
    // 长轮询中分段任务id
    private int taskid;
    private volatile boolean isinitializing = true;
  
        ...省略代码
}

根据名字可以得知, cachedata 是配置数据缓存中的对象。listeners 属性比较有意思,在 bo 中拥有一个监听列表,当该对象md5改变时会通过遍历 listeners 通知监听者们。

前一节从服务端获取到有更新的配置之后会检查md5,调用 cachedata.checklistenermd5()方法:

void checklistenermd5() {
   for (managerlistenerwrap wrap : listeners) {
        if (!md5.equals(wrap.lastcallmd5)) {
            safenotifylistener(dataid, group, content, md5, wrap);
        }
    }
}
class managerlistenerwrap {
    final listener listener;
    string lastcallmd5 = cachedata.getmd5string(null);
        ... 省略代码
}

managerlistenerwrap 的 lastcallmd5 是旧配置的md5值,如果 cachedata 的md5和 managerlistenerwrap 的lastcallmd5 值不一样,说明配置有更新。需要通知未更新的监听者。

private void safenotifylistener(final string dataid, final string group, final string content, final string md5, final managerlistenerwrap listenerwrap) {
    final listener listener = listenerwrap.listener;
    runnable job = new runnable() {
        @override
        public void run() {
            ... 省略代码
                // 调用监听者的方法
                listener.receiveconfiginfo(contenttmp);
                listenerwrap.lastcallmd5 = md5;
            ... 省略代码
        }
    };
    try {
        if (null != listener.getexecutor()) {
            listener.getexecutor().execute(job);
        } else {
            job.run();
        }
    } catch (throwable t) {
    }
}

调用了监听者的 receiveconfiginfo() 方法,然后修改 managerlistenerwrap 的lastcallmd5 值。

本节到这里分析了从服务端获取更新配置后通知配置监听者。但是监听者是什么时候注册的呢?接下来我们继续分析监听者注册到 cachedata 过程。

nacoscontextrefresher 实现了applicationlistener 。在容器准备后会调用 onapplicationevent() 方法,最终调用 registernacoslistener() 方法。

private void registernacoslistener(final string group, final string dataid) {
   listener listener = listenermap.computeifabsent(dataid, i -> new listener() {
     // 通知监听者调用的就是这个方法啦 
     @override
      public void receiveconfiginfo(string configinfo) {
         refreshcountincrement();
         string md5 = "";
         if (!stringutils.isempty(configinfo)) {
            try {
               messagedigest md = messagedigest.getinstance("md5");
               md5 = new biginteger(1, md.digest(configinfo.getbytes("utf-8"))).tostring(16);
            }
            catch (nosuchalgorithmexception | unsupportedencodingexception e) {
               log.warn("[nacos] unable to get md5 for dataid: " + dataid, e);
            }
         }
         refreshhistory.add(dataid, md5);
         // spring的刷新事件通知,刷新监听者会被执行
         applicationcontext.publishevent(new refreshevent(this, null, "refresh nacos config"));
      }
      @override
      public executor getexecutor() {
         return null;
      }
   });
  // 注册本监听者
  configservice.addlistener(dataid, group, listener);
  ...省略代码
}

通过 nacosconfigservice.addlistener()注册监听者。

nacosconfigservice.addlistener():

public void addlistener(string dataid, string group, listener listener) throws nacosexception {
    worker.addtenantlisteners(dataid, group, arrays.aslist(listener));
}

还是交给了 clientworker

clientworker.addtenantlisteners()

public void addtenantlisteners(string dataid, string group, list<? extends listener> listeners) throws nacosexception {
    group = null2defaultgroup(group);
    string tenant = agent.gettenant();
    cachedata cache = addcachedataifabsent(dataid, group, tenant);
    for (listener listener : listeners) {
        cache.addlistener(listener);
    }
}

clientworker 把监听者交给了 cachedata 完成了注册。

汇总系统运行中更新配置的流程:

  1. 启动时把本地更新 listener 注册到 cachedata。
  2. clientworker 长轮询同步服务端的更新配置。
  3. 2中获取到更新后的配置,重置 cachedata 内容。
  4. cachedata 回调1中注册上来的 listener.receiveconfiginfo()
  5. listener 最终通知spring刷新事件,完成context刷新属性值。

总结

nacos config client 和 nacos config server 采用定时长轮询http请求访问配置更新,这样设计 nacos config server 和 config client 结构简单。server 也没有长连接模式client过多的压力。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网