当前位置: 移动技术网 > IT编程>开发语言>Java > 浅谈Spring Cloud Ribbon的原理

浅谈Spring Cloud Ribbon的原理

2019年07月19日  | 移动技术网IT编程  | 我要评论

贵妇人王曼媛之死,学园默示录07,mild851

ribbon是netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将netflix的中间层服务连接在一起。ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出load balancer(简称lb)后面所有的机器,ribbon会自动的帮助你基于某种规则(如简单轮询,随即连接等)去连接这些机器。我们也很容易使用ribbon实现自定义的负载均衡算法。

说起负载均衡一般都会想到服务端的负载均衡,常用产品包括lbs硬件或云服务、nginx等,都是耳熟能详的产品。

而spring cloud提供了让服务调用端具备负载均衡能力的ribbon,通过和eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。

具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。

直接开撸代码,通过代码来看ribbon是如何实现的。

配置

详解:

1.ribbonautoconfiguration配置生成ribbonloadbalancerclient实例。

代码位置:

spring-cloud-netflix-core-1.3.5.release.jar

org.springframework.cloud.netflix.ribbon

ribbonautoconfiguration.class

@configuration
@conditionalonclass({ iclient.class, resttemplate.class, asyncresttemplate.class, ribbon.class})
@ribbonclients
@autoconfigureafter(name = "org.springframework.cloud.netflix.eureka.eurekaclientautoconfiguration")
@autoconfigurebefore({loadbalancerautoconfiguration.class, asyncloadbalancerautoconfiguration.class})
@enableconfigurationproperties(ribboneagerloadproperties.class)
public class ribbonautoconfiguration {

 // 略

 @bean
 @conditionalonmissingbean(loadbalancerclient.class)
 public loadbalancerclient loadbalancerclient() {
  return new ribbonloadbalancerclient(springclientfactory());
 }
  // 略
}

先看配置条件项,ribbonautoconfiguration配置必须在loadbalancerautoconfiguration配置前执行,因为在loadbalancerautoconfiguration配置中会使用ribbonloadbalancerclient实例。

ribbonloadbalancerclient继承自loadbalancerclient接口,是负载均衡客户端,也是负载均衡策略的调用方。

2.loadbalancerinterceptorconfig配置生成:

1).负载均衡拦截器loadbalancerinterceptor实例

包含:

loadbalancerclient实现类的ribbonloadbalancerclient实例

负载均衡的请求创建工厂loadbalancerrequestfactory:实例

2).resttemplate自定义的resttemplatecustomizer实例

代码位置:

spring-cloud-commons-1.2.4.release.jar

org.springframework.cloud.client.loadbalancer

loadbalancerautoconfiguration.class

@configuration
@conditionalonclass(resttemplate.class)
@conditionalonbean(loadbalancerclient.class)
@enableconfigurationproperties(loadbalancerretryproperties.class)
public class loadbalancerautoconfiguration {
 // 略
 @bean
 @conditionalonmissingbean
 public loadbalancerrequestfactory loadbalancerrequestfactory(
   loadbalancerclient loadbalancerclient) {
  return new loadbalancerrequestfactory(loadbalancerclient, transformers);
 }

 @configuration
 @conditionalonmissingclass("org.springframework.retry.support.retrytemplate")
 static class loadbalancerinterceptorconfig {
  @bean
  public loadbalancerinterceptor ribboninterceptor(
    loadbalancerclient loadbalancerclient,
    loadbalancerrequestfactory requestfactory) {
   return new loadbalancerinterceptor(loadbalancerclient, requestfactory);
  }

  @bean
  @conditionalonmissingbean
  public resttemplatecustomizer resttemplatecustomizer(
    final loadbalancerinterceptor loadbalancerinterceptor) {
   return new resttemplatecustomizer() {
    @override
    public void customize(resttemplate resttemplate) {
     list<clienthttprequestinterceptor> list = new arraylist<>(
       resttemplate.getinterceptors());
     list.add(loadbalancerinterceptor);
     resttemplate.setinterceptors(list);
    }
   };
  }
 }
 // 略
}

先看配置条件项:

要求在项目环境中必须要有resttemplate类。

要求必须要有loadbalancerclient接口的实现类的实例,也就是上一步生成的ribbonloadbalancerclient。

3.通过上面一步创建的resttemplatecustomizer配置所有resttemplate实例,就是将负载均衡拦截器设置给resttemplate实例。

@configuration
@conditionalonclass(resttemplate.class)
@conditionalonbean(loadbalancerclient.class)
@enableconfigurationproperties(loadbalancerretryproperties.class)
public class loadbalancerautoconfiguration {
 // 略

 @bean
 public smartinitializingsingleton loadbalancedresttemplateinitializer(
   final list<resttemplatecustomizer> customizers) {
  return new smartinitializingsingleton() {
   @override
   public void aftersingletonsinstantiated() {
    for (resttemplate resttemplate : loadbalancerautoconfiguration.this.resttemplates) {
     for (resttemplatecustomizer customizer : customizers) {
      customizer.customize(resttemplate);
     }
    }
   }
  };
 }

 // 略
 @configuration
 @conditionalonmissingclass("org.springframework.retry.support.retrytemplate")
 static class loadbalancerinterceptorconfig {
  @bean
  public loadbalancerinterceptor ribboninterceptor(
    loadbalancerclient loadbalancerclient,
    loadbalancerrequestfactory requestfactory) {
   return new loadbalancerinterceptor(loadbalancerclient, requestfactory);
  }

  @bean
  @conditionalonmissingbean
  public resttemplatecustomizer resttemplatecustomizer(
    final loadbalancerinterceptor loadbalancerinterceptor) {
   return new resttemplatecustomizer() {
    @override
    public void customize(resttemplate resttemplate) {
     list<clienthttprequestinterceptor> list = new arraylist<>(
       resttemplate.getinterceptors());
     list.add(loadbalancerinterceptor);
     resttemplate.setinterceptors(list);
    }
   };
  }
 }
 // 略
}

resttemplate.setinterceptors(list)这个地方就是注入负载均衡拦截器的地方loadbalancerinterceptor。

从这个地方实际上也可以猜出来,resttemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。

也能看出来可以自定义拦截器实现其他目的。

4.ribbonclientconfiguration配置生成zoneawareloadbalancer实例

代码位置:

spring-cloud-netflix-core-1.3.5.release.jar

org.springframework.cloud.netflix.ribbon

ribbonclientconfiguration.class

@suppresswarnings("deprecation")
@configuration
@enableconfigurationproperties
//order is important here, last should be the default, first should be optional
// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@import({okhttpribbonconfiguration.class, restclientribbonconfiguration.class, httpclientribbonconfiguration.class})
public class ribbonclientconfiguration {
 // 略
 @bean
 @conditionalonmissingbean
 public iloadbalancer ribbonloadbalancer(iclientconfig config,
   serverlist<server> serverlist, serverlistfilter<server> serverlistfilter,
   irule rule, iping ping, serverlistupdater serverlistupdater) {
  if (this.propertiesfactory.isset(iloadbalancer.class, name)) {
   return this.propertiesfactory.get(iloadbalancer.class, config, name);
  }
  return new zoneawareloadbalancer<>(config, rule, ping, serverlist,
    serverlistfilter, serverlistupdater);
 }

 // 略
}

zoneawareloadbalancer继承自iloadbalancer接口,该接口有一个方法:

 /**
  * choose a server from load balancer.
  * 
  * @param key an object that the load balancer may use to determine which server to return. null if 
  *   the load balancer does not use this parameter.
  * @return server chosen
  */
 public server chooseserver(object key);

zoneawareloadbalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseserver方法的实现选取某个服务实例。

拦截&请求

1.使用resttemplate进行get、post等各种请求,都是通过doexecute方法实现

代码位置:
spring-web-4.3.12.release.jar

org.springframework.web.client

resttemplate.class

public class resttemplate extends interceptinghttpaccessor implements restoperations {

 // 略

 protected <t> t doexecute(uri url, httpmethod method, requestcallback requestcallback,
   responseextractor<t> responseextractor) throws restclientexception {

  assert.notnull(url, "'url' must not be null");
  assert.notnull(method, "'method' must not be null");
  clienthttpresponse response = null;
  try {
   clienthttprequest request = createrequest(url, method);
   if (requestcallback != null) {
    requestcallback.dowithrequest(request);
   }
   response = request.execute();
   handleresponse(url, method, response);
   if (responseextractor != null) {
    return responseextractor.extractdata(response);
   }
   else {
    return null;
   }
  }
  catch (ioexception ex) {
   string resource = url.tostring();
   string query = url.getrawquery();
   resource = (query != null ? resource.substring(0, resource.indexof('?')) : resource);
   throw new resourceaccessexception("i/o error on " + method.name() +
     " request for \"" + resource + "\": " + ex.getmessage(), ex);
  }
  finally {
   if (response != null) {
    response.close();
   }
  }
 }

 // 略

}

支持的各种http请求方法最终都是调用doexecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。

2.生成请求实例创建工厂

上一步代码中,调用createrequest方法创建请求实例,这个方法是定义在父类中。

先整理出主要的继承关系:

createrequest方法实际是定义在httpaccessor抽象类中。

public abstract class httpaccessor {
 private clienthttprequestfactory requestfactory = new simpleclienthttprequestfactory();
 public void setrequestfactory(clienthttprequestfactory requestfactory) {
  assert.notnull(requestfactory, "clienthttprequestfactory must not be null");
  this.requestfactory = requestfactory;
 }
 public clienthttprequestfactory getrequestfactory() {
  return this.requestfactory;
 }
 protected clienthttprequest createrequest(uri url, httpmethod method) throws ioexception {
  clienthttprequest request = getrequestfactory().createrequest(url, method);
  if (logger.isdebugenabled()) {
   logger.debug("created " + method.name() + " request for \"" + url + "\"");
  }
  return request;
 }
}

在createrequest方法中调用getrequestfactory方法获得请求实例创建工厂,实际上getrequestfactory并不是当前httpaccessor类中定义的,而是在子类interceptinghttpaccessor中定义的。

public abstract class interceptinghttpaccessor extends httpaccessor {

 private list<clienthttprequestinterceptor> interceptors = new arraylist<clienthttprequestinterceptor>();

 public void setinterceptors(list<clienthttprequestinterceptor> interceptors) {
  this.interceptors = interceptors;
 }

 public list<clienthttprequestinterceptor> getinterceptors() {
  return interceptors;
 }

 @override
 public clienthttprequestfactory getrequestfactory() {
  clienthttprequestfactory delegate = super.getrequestfactory();
  if (!collectionutils.isempty(getinterceptors())) {
   return new interceptingclienthttprequestfactory(delegate, getinterceptors());
  }
  else {
   return delegate;
  }
 }
}

在这里做了个小动作,首先还是通过httpaccessor类创建并获得simpleclienthttprequestfactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。

其次,在有拦截器注入的情况下,创建interceptingclienthttprequestfactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从interceptingclienthttprequestfactory工厂创建。

3.通过工厂创建请求实例

创建实例就看工厂的createrequest方法。

public class interceptingclienthttprequestfactory extends abstractclienthttprequestfactorywrapper {

 private final list<clienthttprequestinterceptor> interceptors;

 public interceptingclienthttprequestfactory(clienthttprequestfactory requestfactory,
   list<clienthttprequestinterceptor> interceptors) {

  super(requestfactory);
  this.interceptors = (interceptors != null ? interceptors : collections.<clienthttprequestinterceptor>emptylist());
 }


 @override
 protected clienthttprequest createrequest(uri uri, httpmethod httpmethod, clienthttprequestfactory requestfactory) {
  return new interceptingclienthttprequest(requestfactory, this.interceptors, uri, httpmethod);
 }

}

就是new了个interceptingclienthttprequest实例,并且把拦截器、基本请求实例创建工厂注进去。

4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept

可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。

clienthttprequest request = createrequest(url, method);
if (requestcallback != null) {
 requestcallback.dowithrequest(request);
}
response = request.execute();

实际请求实例是interceptingclienthttprequest,execute实际是在它的父类中。

类定义位置:

spring-web-4.3.12.release.jar

org.springframework.http.client

interceptingclienthttprequest.class

看一下它们的继承关系。

在execute方法中实际调用了子类实现的executeinternal方法。

public abstract class abstractclienthttprequest implements clienthttprequest {

 private final httpheaders headers = new httpheaders();

 private boolean executed = false;

 @override
 public final httpheaders getheaders() {
  return (this.executed ? httpheaders.readonlyhttpheaders(this.headers) : this.headers);
 }

 @override
 public final outputstream getbody() throws ioexception {
  assertnotexecuted();
  return getbodyinternal(this.headers);
 }

 @override
 public final clienthttpresponse execute() throws ioexception {
  assertnotexecuted();
  clienthttpresponse result = executeinternal(this.headers);
  this.executed = true;
  return result;
 }

 protected void assertnotexecuted() {
  assert.state(!this.executed, "clienthttprequest already executed");
 }

 protected abstract outputstream getbodyinternal(httpheaders headers) throws ioexception;

 protected abstract clienthttpresponse executeinternal(httpheaders headers) throws ioexception;

}

其实就是interceptingclienthttprequest类的executeinternal方法,其中,又调用了一个执行器interceptingrequestexecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。

这里的拦截器实际上就是在配置阶段注入进resttemplate实例的负载均衡拦截器loadbalancerinterceptor实例,可参考上面配置阶段的第2步。

class interceptingclienthttprequest extends abstractbufferingclienthttprequest {

 // 略

 @override
 protected final clienthttpresponse executeinternal(httpheaders headers, byte[] bufferedoutput) throws ioexception {
  interceptingrequestexecution requestexecution = new interceptingrequestexecution();
  return requestexecution.execute(this, bufferedoutput);
 }


 private class interceptingrequestexecution implements clienthttprequestexecution {

  private final iterator<clienthttprequestinterceptor> iterator;

  public interceptingrequestexecution() {
   this.iterator = interceptors.iterator();
  }

  @override
  public clienthttpresponse execute(httprequest request, byte[] body) throws ioexception {
   if (this.iterator.hasnext()) {
    clienthttprequestinterceptor nextinterceptor = this.iterator.next();
    return nextinterceptor.intercept(request, body, this);
   }
   else {
    clienthttprequest delegate = requestfactory.createrequest(request.geturi(), request.getmethod());
    for (map.entry<string, list<string>> entry : request.getheaders().entryset()) {
     list<string> values = entry.getvalue();
     for (string value : values) {
      delegate.getheaders().add(entry.getkey(), value);
     }
    }
    if (body.length > 0) {
     streamutils.copy(body, delegate.getbody());
    }
    return delegate.execute();
   }
  }
 }

}

5.负载均衡拦截器调用负载均衡客户端

在负载均衡拦截器loadbalancerinterceptor类的intercept方法中,又调用了负载均衡客户端loadbalancerclient实现类的execute方法。

public class loadbalancerinterceptor implements clienthttprequestinterceptor {

 private loadbalancerclient loadbalancer;
 private loadbalancerrequestfactory requestfactory;

 public loadbalancerinterceptor(loadbalancerclient loadbalancer, loadbalancerrequestfactory requestfactory) {
  this.loadbalancer = loadbalancer;
  this.requestfactory = requestfactory;
 }

 public loadbalancerinterceptor(loadbalancerclient loadbalancer) {
  // for backwards compatibility
  this(loadbalancer, new loadbalancerrequestfactory(loadbalancer));
 }

 @override
 public clienthttpresponse intercept(final httprequest request, final byte[] body,
   final clienthttprequestexecution execution) throws ioexception {
  final uri originaluri = request.geturi();
  string servicename = originaluri.gethost();
  assert.state(servicename != null, "request uri does not contain a valid hostname: " + originaluri);
  return this.loadbalancer.execute(servicename, requestfactory.createrequest(request, body, execution));
 }
}

在配置阶段的第1步,可以看到实现类是ribbonloadbalancerclient。

6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求

在ribbonloadbalancerclient的第一个execute方法以及getserver方法中可以看到,实际上是通过iloadbalancer的负载均衡器实现类作的chooseserver方法选取一个服务,交给接下来的请求对象发起一个请求。

这里的负载均衡实现类默认是zoneawareloadbalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。

zoneawareloadbalancer的创建可以参考配置阶段的第4步。

public class ribbonloadbalancerclient implements loadbalancerclient {
 @override
 public <t> t execute(string serviceid, loadbalancerrequest<t> request) throws ioexception {
  iloadbalancer loadbalancer = getloadbalancer(serviceid);
  server server = getserver(loadbalancer);
  if (server == null) {
   throw new illegalstateexception("no instances available for " + serviceid);
  }
  ribbonserver ribbonserver = new ribbonserver(serviceid, server, issecure(server,
    serviceid), serverintrospector(serviceid).getmetadata(server));

  return execute(serviceid, ribbonserver, request);
 }

 @override
 public <t> t execute(string serviceid, serviceinstance serviceinstance, loadbalancerrequest<t> request) throws ioexception {
  server server = null;
  if(serviceinstance instanceof ribbonserver) {
   server = ((ribbonserver)serviceinstance).getserver();
  }
  if (server == null) {
   throw new illegalstateexception("no instances available for " + serviceid);
  }

  ribbonloadbalancercontext context = this.clientfactory
    .getloadbalancercontext(serviceid);
  ribbonstatsrecorder statsrecorder = new ribbonstatsrecorder(context, server);

  try {
   t returnval = request.apply(serviceinstance);
   statsrecorder.recordstats(returnval);
   return returnval;
  }
  // catch ioexception and rethrow so resttemplate behaves correctly
  catch (ioexception ex) {
   statsrecorder.recordstats(ex);
   throw ex;
  }
  catch (exception ex) {
   statsrecorder.recordstats(ex);
   reflectionutils.rethrowruntimeexception(ex);
  }
  return null;
 }
  
 // 略 

 protected server getserver(iloadbalancer loadbalancer) {
  if (loadbalancer == null) {
   return null;
  }
  return loadbalancer.chooseserver("default"); // todo: better handling of key
 }

 protected iloadbalancer getloadbalancer(string serviceid) {
  return this.clientfactory.getloadbalancer(serviceid);
 }

 public static class ribbonserver implements serviceinstance {
  private final string serviceid;
  private final server server;
  private final boolean secure;
  private map<string, string> metadata;

  public ribbonserver(string serviceid, server server) {
   this(serviceid, server, false, collections.<string, string> emptymap());
  }

  public ribbonserver(string serviceid, server server, boolean secure,
    map<string, string> metadata) {
   this.serviceid = serviceid;
   this.server = server;
   this.secure = secure;
   this.metadata = metadata;
  }

  // 略
 }

}

代码撸完,总结下。

普通使用resttemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。

为resttemplate增加了@loanbalanced 注解后,实际上通过配置,为resttemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网