当前位置: 移动技术网 > IT编程>开发语言>Java > sentinel 滑动窗口统计机制

sentinel 滑动窗口统计机制

2019年06月30日  | 移动技术网IT编程  | 我要评论

sentinel的滑动窗口统计机制就是根据当前时间,获取对应的时间窗口,并更新该时间窗口中的各项统计指标(pass/block/rt等),这些指标被用来进行后续判断,比如限流、降级等;随着时间的推移,当前时间点对应的时间窗口是变化的,这时会涉及到时间窗口的初始化、复用等。可以说,sentinel上的功能所用到的数据几乎都是滑动窗口统计机制来维护和更新的。

 

sentinel 处理流程是基于slot链(processorslotchain)来完成的,比如限流、熔断等,其中重要的一个slot就是statisticslot,它是做各种数据统计的,而限流/熔断的数据判断来源就是statisticslot,statisticslot的各种数据统计都是基于滑动窗口来完成的,因此本文就重点分析statisticslot的滑动窗口统计机制。

 

sentinel 的slot链(processorslotchain)是责任链模式的体现,那slotchain是在哪创建的呢?是在 ctsph.lookprocesschain()方法中创建的,并且该方法会根据当前请求的资源先去一个静态的hashmap中获取,如果获取不到才会创建,创建后会保存到hashmap中。这就意味着,同一个资源会全局共享一个slotchain。默认生成processorslotchain逻辑为:

 1 // defaultslotchainbuilder
 2 public processorslotchain build() {
 3    processorslotchain chain = new defaultprocessorslotchain();
 4    chain.addlast(new nodeselectorslot());
 5    chain.addlast(new clusterbuilderslot());
 6    chain.addlast(new logslot());
 7    chain.addlast(new statisticslot());
 8    chain.addlast(new systemslot());
 9    chain.addlast(new authorityslot());
10    chain.addlast(new flowslot());
11    chain.addlast(new degradeslot());
12 
13    return chain;
14 }

 

整个处理过程从第一个slot往后一直传递到最后一个的,当到达statisticslot时,开始统计各项指标,统计的结果又会被后续的slot所采用,作为各种规则校验的依据。各种指标如下:

public enum metricevent {
   pass, // normal pass.
   block, // normal block.
   exception, // 异常统计
   success,
   rt, // rt统计
   occupied_pass
}

 

statisticslot.entry流程

处理流程走到statisticslot时,首先触发后续slot.entry方法,然后统计各项指标,后续slot中数据判断来源就是这里统计的各项指标。statisticslot.entry 逻辑如下:

 1 @override
 2 public void entry(context context, resourcewrapper resourcewrapper, defaultnodenode, int count, object... args) throws throwable {
 3    try {
 4        // 触发下一个slot的entry方法
 5        fireentry(context, resourcewrapper, node, count, args);
 6        // 如果能通过slotchain中后面的slot的entry方法,说明没有被限流或降级
 7        // 统计信息
 8        node.increasethreadnum();
 9        node.addpassrequest();
10        // 省略部分代码
11   } catch (blockexception e) {
12        context.getcurentry().seterror(e);
13        // add block count.
14        node.increaseblockedqps();
15        // 省略部分代码
16        throw e;
17   } catch (throwable e) {
18        context.getcurentry().seterror(e);
19        // should not happen
20        node.increaseexceptionqps();
21        // 省略部分代码
22        throw e;
23   }
24 }

由以上代码可知,statisticslot主要就做了3件事:

  1. 触发后续slot的entry方法,进行规则校验

  2. 校验通过则更新node实时指标数据

  3. 校验不通过则更新node异常指标数据

注意:由于后续的fireentry操作和更新本次统计信息是两个操作,不是原子的,会造成限流不准的小问题,比如设置的flowrule count为20,并发情况下可能稍大于20,不过针对大部分场景来说,这点偏差是可以容忍的,毕竟我们要的是限流效果,而不是必须精确的限流操作。

 

更新node实时指标数据

我们可以看到 node.addpassrequest() 这段代码是在fireentry执行之后执行的,这意味着,当前请求通过了sentinel的流控等规则,此时需要将当次请求记录下来,也就是执行 node.addpassrequest() 这行代码,具体的代码如下所示:

1 // defaultnode
2 public void addpassrequest() {
3    super.addpassrequest();
4    this.clusternode.addpassrequest();
5 }

这里的node是一个 defaultnode 实例,这里特别补充一个 defaultnode 和 clusternode 的区别:

  • defaultnode:保存着某个resource在某个context中的实时指标,每个defaultnode都指向一个clusternode。

  • clusternode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个clusternode,不管他在哪个context中。

 

上面代码不管是 defaultnode 还是 clusternode ,走的都是statisticnode 对象的 addpassrequest 方法:

1 private transient volatile metric rollingcounterinsecond = new arraymetric(2, 1000);
2 private transient metric rollingcounterinminute = new arraymetric(60, 60 * 1000);
3 
4 public void addpassrequest(int count) {
5    rollingcounterinsecond.addpass(count); // 对每秒指标统计
6    rollingcounterinminute.addpass(count); // 每分钟指标统计
7 }

 

每一个通过的指标(pass)都是调用metric 的接口进行操作的,并且是通过 arraymetric 这种实现类,代码如下:

public arraymetric(int windowlength, int interval) {
   this.data = new windowleaparray(windowlength, interval);
}

public void addpass(int count) {
   // 获取当前时间窗口
   windowwrap<metricbucket> wrap = data.currentwindow();
   wrap.value().addpass(count);
}

 

首先通过 currentwindow() 获取当前时间窗口,然后更新当前时间窗口对应的统计指标,以下代码重点关注几个判断逻辑:

 1 // leaparray
 2 public windowwrap<t> currentwindow() {
 3    return currentwindow(timeutil.currenttimemillis());
 4 }
 5 // timeutil
 6 public static long currenttimemillis() {
 7    // currenttimemillis是由一个tick线程每个1ms更新一次,具体逻辑在timeutil类中
 8    return currenttimemillis;
 9 }
10 // leaparray
11 public windowwrap<t> currentwindow(long timemillis) {
12    // 计算当前时间点落在滑动窗口的下标
13    int idx = calculatetimeidx(timemillis);
14    // calculate current bucket start time.
15    long windowstart = calculatewindowstart(timemillis);
16 
17    // 获取当前时间点对应的windowwrap,array为atomicreferencearray
18    while (true) {
19        windowwrap<t> old = array.get(idx);
20        if (old == null) {
21            // 1.为空表示当前时间窗口为初始化过,创建windowwrap并cas设置到array中
22            windowwrap<t> window = new windowwrap<t>(windowlengthinms,windowstart, newemptybucket());
23            if (array.compareandset(idx, null, window)) {
24                return window;
25           } else {
26                thread.yield();
27           }
28       } else if (windowstart == old.windowstart()) {
29            // 2.获取的时间窗口正好对应当前时间,直接返回
30            return old;
31       } else if (windowstart > old.windowstart()) {
32            // 3.获取的时间窗口为老的,进行reset操作复用
33            if (updatelock.trylock()) {
34                try {
35                    return resetwindowto(old, windowstart);
36               } finally {
37                    updatelock.unlock();
38               }
39           } else {
40                thread.yield();
41           }
42       } else if (windowstart < old.windowstart()) {
43            // 4.时间回拨了,正常情况下不会走到这里
44            return new windowwrap<t>(windowlengthinms, windowstart,newemptybucket());
45       }
46   }
47 }

 

获取当前时间窗口对应的windowwrap之后,就可以进行更新操作了。

// wrap.value().addpass(count);
public void addpass(int n) {
   add(metricevent.pass, n);
}
// metricbucket
public metricbucket add(metricevent event, long n) {
   // 对应metricevent枚举中值
   counters[event.ordinal()].add(n);
   return this;
}

到这里为止,整个指标统计流程就完成了,下面重点看下滑动窗口机制。

 

滑动窗口机制

时间窗口是用windowwrap对象表示的,其属性如下:

private final long windowlengthinms;  // 时间窗口的长度
private long windowstart; // 时间窗口开始时间
private t value; // metricbucket对象,保存各个指标数据

 

sentinel时间基准由tick线程来做,每1ms更新一次时间基准,逻辑如下:

currenttimemillis = system.currenttimemillis();
thread daemon = new thread(new runnable() {
   @override
   public void run() {
       while (true) {
           currenttimemillis = system.currenttimemillis();
           try {
               timeunit.milliseconds.sleep(1);
          } catch (throwable e) {
          }
      }
  }
});
daemon.setdaemon(true);
daemon.setname("sentinel-time-tick-thread");
daemon.start();

 

sentinel默认有每秒和每分钟的滑动窗口,对应的leaparray类型,它们的初始化逻辑是:

protected int windowlengthinms; // 单个滑动窗口时间值
protected int samplecount; // 滑动窗口个数
protected int intervalinms; // 周期值(相当于所有滑动窗口时间值之和)

public leaparray(int samplecount, int intervalinms) {
   this.windowlengthinms = intervalinms / samplecount;
   this.intervalinms = intervalinms;
   this.samplecount = samplecount;

   this.array = new atomicreferencearray<windowwrap<t>>(samplecount);
}

针对每秒滑动窗口,windowlengthinms=500,samplecount=2,intervalinms=1000,针对每分钟滑动窗口,windowlengthinms=1000,samplecount=60,intervalinms=60000,对应代码:

private transient volatile metric rollingcounterinsecond = new arraymetric(2, 1000);
private transient metric rollingcounterinminute = new arraymetric(60, 60 * 1000);

currenttimemillis时间基准(tick线程)每1ms更新一次,通过currentwindow(timemillis)方法获取当前时间点对应的windowwrap对象,然后更新对应的各种指标,用于做限流、降级时使用。注意,当前时间基准对应的事件窗口初始化时lazy模式,并且会复用的。

 

sentinel 底层采用高性能的滑动窗口数据结构 leaparray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。最后以一张图结束吧:

 

 

 往期精选 

觉得文章不错,对你有所启发和帮助,希望能转发给更多的小伙伴。如果有问题,请关注下面公众号,发送问题给我,多谢。
欢迎小伙伴关注【topcoder】阅读更多精彩好文。

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网