当前位置: 移动技术网 > IT编程>开发语言>Java > springboot+zuul实现自定义过滤器、动态路由、动态负载。

springboot+zuul实现自定义过滤器、动态路由、动态负载。

2018年09月14日  | 移动技术网IT编程  | 我要评论

参考: 

zuul是netflix开源的微服务网关,他的核心是一系列的过滤器,通过这些过滤器我们可以轻松的实现服务的访问认证、限流、路由、负载、熔断等功能。

基于对已有项目代码零侵入的需求,本文没有将zuul网关项目注册到eureka中心,而是将zuul与springboot结合作为一个独立的项目进行请求转发,因此本项目是非spring cloud架构。

开始编写zuul网关项目 
首先,新建一个spring boot项目。加入zuul依赖,开启@enablezuulproxy注解。 
pom.xml

1 <dependency>
2     <groupid>org.springframework.cloud</groupid>
3     <artifactid>spring-cloud-starter-zuul</artifactid>
4     <version>1.4.4.release</version>
5 </dependency>

application.properties

1 server.port=8090
2 eureka.client.enable=false
3 zuul.ribbon.eager-load.enabled=true
4 
5 zuul.senderrorfilter.post.disable=true

由于后续会使用到动态路由,所以这里我们并不需要在application.properties中做网关地址转发映射。

springbootzuulapplication.java

 1 package com.syher.zuul;
 2 
 3 import com.google.common.util.concurrent.threadfactorybuilder;
 4 import com.syher.zuul.core.zuul.router.propertiesrouter;
 5 import org.springframework.beans.factory.annotation.autowired;
 6 import org.springframework.boot.commandlinerunner;
 7 import org.springframework.boot.springapplication;
 8 import org.springframework.boot.autoconfigure.enableautoconfiguration;
 9 import org.springframework.cloud.netflix.zuul.enablezuulproxy;
10 import org.springframework.cloud.netflix.zuul.routesrefreshedevent;
11 import org.springframework.cloud.netflix.zuul.filters.routelocator;
12 import org.springframework.context.applicationeventpublisher;
13 import org.springframework.context.annotation.componentscan;
14 
15 import java.io.file;
16 import java.util.concurrent.executors;
17 import java.util.concurrent.scheduledexecutorservice;
18 import java.util.concurrent.timeunit;
19 
20 /**
21  * @author braska
22  * @date 2018/06/25.
23  **/
24 @enableautoconfiguration
25 @enablezuulproxy
26 @componentscan(basepackages = {
27         "com.syher.zuul.core",
28         "com.syher.zuul.service"
29 })
30 public class springbootzuulapplication implements commandlinerunner {
31     @autowired
32     applicationeventpublisher publisher;
33     @autowired
34     routelocator routelocator;
35 
36     private scheduledexecutorservice executor;
37     private long lastmodified = 0l;
38     private boolean instance = true;
39 
40     public static void main(string[] args) {
41         springapplication.run(springbootzuulapplication.class, args);
42     }
43 
44     @override
45     public void run(string... args) throws exception {
46         executor = executors.newsinglethreadscheduledexecutor(
47                 new threadfactorybuilder().setnameformat("properties read.").build()
48         );
49         executor.schedulewithfixeddelay(() -> publish(), 0, 1, timeunit.seconds);
50     }
51 
52     private void publish() {
53         if (ispropertiesmodified()) {
54             publisher.publishevent(new routesrefreshedevent(routelocator));
55         }
56     }
57 
58     private boolean ispropertiesmodified() {
59         file file = new file(this.getclass().getclassloader().getresource(propertiesrouter.properties_file).getpath());
60         if (instance) {
61             instance = false;
62             return false;
63         }
64         if (file.lastmodified() > lastmodified) {
65             lastmodified = file.lastmodified();
66             return true;
67         }
68         return false;
69     }
70 }

一、自定义过滤器

自定义zuul过滤器比较简单。我们先讲过滤器。 
zuul过滤器分为pre、route、post、error四种类型。作用我就不详细讲了,网上资料一大把。本文主要写路由前的过滤,即pre类型。 
要自定义一个过滤器,只需要要继承zuulfilter,然后指定过滤类型、过滤顺序、是否执行这个过滤器、过滤内容就ok了。

为了便于扩展,这里用到了模板模式。 
abstractzuulfilter.java

 1 package com.syher.zuul.core.zuul.filter;
 2 
 3 import com.netflix.zuul.zuulfilter;
 4 import com.netflix.zuul.context.requestcontext;
 5 import com.syher.zuul.core.zuul.contantvalue;
 6 
 7 /**
 8  * @author braska
 9  * @date 2018/06/29.
10  **/
11 public abstract class abstractzuulfilter extends zuulfilter {
12 
13     protected requestcontext context;
14 
15     @override
16     public boolean shouldfilter() {
17         requestcontext ctx = requestcontext.getcurrentcontext();
18         return (boolean) (ctx.getordefault(contantvalue.next_filter, true));
19     }
20 
21     @override
22     public object run() {
23         context = requestcontext.getcurrentcontext();
24         return dorun();
25     }
26 
27     public abstract object dorun();
28 
29     public object fail(integer code, string message) {
30         context.set(contantvalue.next_filter, false);
31         context.setsendzuulresponse(false);
32         context.getresponse().setcontenttype("text/html;charset=utf-8");
33         context.setresponsestatuscode(code);
34         context.setresponsebody(string.format("{\"result\":\"%s!\"}", message));
35         return null;
36     }
37 
38     public object success() {
39         context.set(contantvalue.next_filter, true);
40         return null;
41     }
42 }

定义prefilter的抽象类,继承abstractzuulfilter。指定pre类型,之后所有的pre过滤器都可以继承这个抽象类。 
abstractprezuulfilter.java

 1 package com.syher.zuul.core.zuul.filter.pre;
 2 
 3 import com.syher.zuul.core.zuul.filtertype;
 4 import com.syher.zuul.core.zuul.filter.abstractzuulfilter;
 5 
 6 /**
 7  * @author braska
 8  * @date 2018/06/29.
 9  **/
10 public abstract class abstractprezuulfilter extends abstractzuulfilter {
11     @override
12     public string filtertype() {
13         return filtertype.pre.name();
14     }
15 }

接着编写具体一个具体的过滤器,比如限流。 
ratelimiterfilter.java

 1 package com.syher.zuul.core.zuul.filter.pre;
 2 
 3 import com.google.common.util.concurrent.ratelimiter;
 4 import com.syher.zuul.core.zuul.filterorder;
 5 import org.slf4j.logger;
 6 import org.slf4j.loggerfactory;
 7 
 8 import javax.servlet.http.httpservletrequest;
 9 
10 /**
11  * @author braska
12  * @date 2018/06/29.
13  **/
14 public class ratelimiterfilter extends abstractprezuulfilter {
15 
16     private static final logger logger = loggerfactory.getlogger(ratelimiterfilter.class);
17 
18     /**
19      * 每秒允许处理的量是50
20      */
21     ratelimiter ratelimiter = ratelimiter.create(50);
22 
23     @override
24     public int filterorder() {
25         return filterorder.rate_limiter_order;
26     }
27 
28     @override
29     public object dorun() {
30         httpservletrequest request = context.getrequest();
31         string url = request.getrequesturi();
32         if (ratelimiter.tryacquire()) {
33             return success();
34         } else {
35             logger.info("rate limit:{}", url);
36             return fail(401, string.format("rate limit:{}", url));
37         }
38     }
39 }

其他类型的过滤器也一样。创建不同的抽象类,比如abstractpostzuulfilter,指定filtertype,然后具体的postfilter只要继承该抽象类即可。

最后,将过滤器托管给spring。 
zuulconfigure.java

 1 package com.syher.zuul.core.config;
 2 
 3 import com.netflix.loadbalancer.irule;
 4 import com.netflix.zuul.zuulfilter;
 5 import com.syher.zuul.core.ribbon.serverloadbalancerrule;
 6 import com.syher.zuul.core.zuul.filter.pre.ratelimiterfilter;
 7 import com.syher.zuul.core.zuul.filter.pre.tokenaccessfilter;
 8 import com.syher.zuul.core.zuul.filter.pre.userrightfilter;
 9 import com.syher.zuul.core.zuul.router.propertiesrouter;
10 import org.springframework.beans.factory.annotation.autowired;
11 import org.springframework.boot.autoconfigure.web.serverproperties;
12 import org.springframework.cloud.netflix.zuul.filters.zuulproperties;
13 import org.springframework.context.annotation.bean;
14 import org.springframework.context.annotation.configuration;
15 
16 /**
17  * @author braska
18  * @date 2018/07/05.
19  **/
20 @configuration
21 public class zuulconfigure {
22 
23     @autowired
24     zuulproperties zuulproperties;
25     @autowired
26     serverproperties server;
27 
28     /**
29      * 动态路由
30      * @return
31      */
32     @bean
33     public propertiesrouter propertiesrouter() {
34         return new propertiesrouter(this.server.getservletprefix(), this.zuulproperties);
35     }
36 
37     /**
38      * 动态负载
39      * @return
40      */
41     @bean
42     public irule loadbalance() {
43         return new serverloadbalancerrule();
44     }
45 
46     /**
47      * 自定义过滤器
48      * @return
49      */
50     @bean
51     public zuulfilter ratelimiterfilter() {
52         return new ratelimiterfilter();
53     }
54 }

二、动态路由

接着写动态路由。动态路由需要配置可持久化且能动态刷新。 
zuul默认使用的路由是simpleroutelocator,不具备动态刷新的效果。discoveryclientroutelocator具备刷新功能,但是需要已有的项目将服务注册到eureka,这不符合已有项目代码零侵入的需求所以排除。那么还有个办法就是自定义路由然后实现refreshableroutelocator类。

部分代码如下: 
abstractdynamicrouter.java

 1 package com.syher.zuul.core.zuul.router;
 2 
 3 import com.syher.zuul.core.zuul.entity.basicroute;
 4 import org.apache.commons.lang.stringutils;
 5 import org.slf4j.logger;
 6 import org.slf4j.loggerfactory;
 7 import org.springframework.beans.beanutils;
 8 import org.springframework.cloud.netflix.zuul.filters.refreshableroutelocator;
 9 import org.springframework.cloud.netflix.zuul.filters.simpleroutelocator;
10 import org.springframework.cloud.netflix.zuul.filters.zuulproperties;
11 
12 import java.util.linkedhashmap;
13 import java.util.list;
14 import java.util.map;
15 
16 /**
17  * @author braska
18  * @date 2018/07/02.
19  **/
20 public abstract class abstractdynamicrouter extends simpleroutelocator implements refreshableroutelocator {
21 
22     private static final logger logger = loggerfactory.getlogger(abstractdynamicrouter.class);
23 
24     public abstractdynamicrouter(string servletpath, zuulproperties properties) {
25         super(servletpath, properties);
26     }
27 
28     @override
29     public void refresh() {
30         dorefresh();
31     }
32 
33     @override
34     protected map<string, zuulproperties.zuulroute> locateroutes() {
35         linkedhashmap<string, zuulproperties.zuulroute> routes = new linkedhashmap<string, zuulproperties.zuulroute>();
36         routes.putall(super.locateroutes());
37 
38         list<basicroute> results = readroutes();
39 
40         for (basicroute result : results) {
41             if (stringutils.isempty(result.getpath()) ) {
42                 continue;
43             }
44             zuulproperties.zuulroute zuulroute = new zuulproperties.zuulroute();
45             try {
46                 beanutils.copyproperties(result, zuulroute);
47             } catch (exception e) {
48                 logger.error("=============load zuul route info from db with error==============", e);
49             }
50             routes.put(zuulroute.getpath(), zuulroute);
51         }
52         return routes;
53     }
54 
55     /**
56      * 读取路由信息
57      * @return
58      */
59     protected abstract list<basicroute> readroutes();
60 }

由于本人比较懒。不想每次写个demo都要重新配置一大堆数据库信息。所以本文很多数据比如路由信息、比如负载策略。要么写在文本里面,要么直接java代码构造。 
本demo的路由信息就是从properties里面读取。嗯,继承abstractdynamicrouter即可。 
propertiesrouter.java

 1 package com.syher.zuul.core.zuul.router;
 2 
 3 import com.google.common.collect.lists;
 4 import com.google.common.util.concurrent.threadfactorybuilder;
 5 import com.syher.zuul.common.context;
 6 import com.syher.zuul.core.zuul.entity.basicroute;
 7 import org.apache.commons.lang.stringutils;
 8 import org.slf4j.logger;
 9 import org.slf4j.loggerfactory;
10 import org.springframework.cloud.netflix.zuul.filters.zuulproperties;
11 
12 import java.io.file;
13 import java.io.ioexception;
14 import java.util.hashmap;
15 import java.util.list;
16 import java.util.map;
17 import java.util.properties;
18 import java.util.concurrent.executors;
19 import java.util.concurrent.scheduledexecutorservice;
20 import java.util.stream.collectors;
21 
22 /**
23  * @author braska
24  * @date 2018/07/02.
25  **/
26 public class propertiesrouter extends abstractdynamicrouter {
27 
28     private static final logger logger = loggerfactory.getlogger(propertiesrouter.class);
29     public static final string properties_file = "router.properties";
30     private static final string zuul_router_prefix = "zuul.routes";
31 
32 
33     public propertiesrouter(string servletpath, zuulproperties properties) {
34         super(servletpath, properties);
35     }
36 
37     @override
38     protected list<basicroute> readroutes() {
39         list<basicroute> list = lists.newarraylistwithexpectedsize(3);
40         try {
41             properties prop = new properties();
42             prop.load(
43                     this.getclass().getclassloader().getresourceasstream(properties_file)
44             );
45 
46             context context = new context(new hashmap<>((map) prop));
47             map<string, string> data = context.getsubproperties(zuul_router_prefix);
48             list<string> ids = data.keyset().stream().map(s -> s.substring(0, s.indexof("."))).distinct().collect(collectors.tolist());
49             ids.stream().foreach(id -> {
50                 map<string, string> router = context.getsubproperties(string.join(".", zuul_router_prefix, id));
51 
52                 string path = router.get("path");
53                 path = path.startswith("/") ? path : "/" + path;
54 
55                 string serviceid = router.getordefault("serviceid", null);
56                 string url = router.getordefault("url", null);
57 
58                 basicroute basicroute = new basicroute();
59                 basicroute.setid(id);
60                 basicroute.setpath(path);
61                 basicroute.seturl(router.getordefault("url", null));
62                 basicroute.setserviceid((stringutils.isblank(url) && stringutils.isblank(serviceid)) ? id : serviceid);
63                 basicroute.setretryable(boolean.parseboolean(router.getordefault("retry-able", "false")));
64                 basicroute.setstripprefix(boolean.parseboolean(router.getordefault("strip-prefix", "false")));
65                 list.add(basicroute);
66             });
67         } catch (ioexception e) {
68             logger.info("error to read " + properties_file + " :{}", e);
69         }
70         return list;
71     }
72 }

既然是动态路由实时刷新,那肯定需要一个定时器定时监控properties文件。所以我在启动类springbootzuulapplication加了个定时器监控properties是否发生过变更(之前有疑问的现在可以解惑了)。一旦文件被修改过就重新发布一下, 然后会触发routelocator的refresh方法。

1 public void publish() {
2         if (ispropertiesmodified()) {
3             publisher.publishevent(new routesrefreshedevent(routelocator));
4         }
5     }

当然,如果是从数据库或者其他地方比如redis读取就不需要用到定时器,只要在增删改的时候直接publish就好了。

最后,记得propertiesrouter类交由spring托管(在zuulconfigure类中配置bean)。

router.properties文件:

1 zuul.routes.dashboard.path=/**
2 zuul.routes.dashboard.strip-prefix=true
3 
4 ##不使用动态负载需指定url
5 ##zuul.routes.dashboard.url=http://localhost:9000/
6 ##zuul服务部署后,动态增加网关映射,无需重启即可实时路由到新的网关
7 ##zuul.routes.baidu.path=/**

三、动态负载

负载也算比较简单,复杂点的是写负载算法。 
动态负载主要分两个步骤: 
1、根据网关项目配置的host和port去数据库(我是java直接造的数据)查找负载策略,比如轮询、比如随机、比如iphash等等。 
2、根据策略结合每台服务器分配的权重选出合适的服务。

实现动态负载需要自定义rule类然后继承abstractloadbalancerrule类。 
首先看负载策略的选择: 
serverloadbalancerrule.java

 1 package com.syher.zuul.core.ribbon;
 2 
 3 import com.google.common.base.preconditions;
 4 import com.netflix.client.config.iclientconfig;
 5 import com.netflix.loadbalancer.abstractloadbalancerrule;
 6 import com.netflix.loadbalancer.iloadbalancer;
 7 import com.netflix.loadbalancer.server;
 8 import com.syher.zuul.common.util.systemutil;
 9 import com.syher.zuul.core.ribbon.balancer.loadbalancer;
10 import com.syher.zuul.core.ribbon.balancer.randomloadbalancer;
11 import com.syher.zuul.core.ribbon.balancer.roundloadbalancer;
12 import com.syher.zuul.entity.gatewayaddress;
13 import com.syher.zuul.service.gatewayservice;
14 import org.apache.commons.lang.stringutils;
15 import org.slf4j.logger;
16 import org.slf4j.loggerfactory;
17 import org.springframework.beans.factory.annotation.autowired;
18 import org.springframework.beans.factory.annotation.value;
19 
20 /**
21  * @author braska
22  * @date 2018/07/05.
23  **/
24 public class serverloadbalancerrule extends abstractloadbalancerrule {
25 
26     private static final logger logger = loggerfactory.getlogger(serverloadbalancerrule.class);
27 
28     @value("${server.host:127.0.0.1}")
29     private string host;
30     @value("${server.port:8080}")
31     private integer port;
32 
33     @autowired
34     private gatewayservice gatewayservice;
35 
36     @override
37     public void initwithniwsconfig(iclientconfig iclientconfig) {
38     }
39 
40     @override
41     public server choose(object key) {
42         return getserver(getloadbalancer(), key);
43     }
44 
45     private server getserver(iloadbalancer loadbalancer, object key) {
46         if (stringutils.isblank(host)) {
47             host = systemutil.iplist().get(0);
48         }
49         //preconditions.checkargument(host != null, "server.host must be specify.");
50         //preconditions.checkargument(port != null, "server.port must be specify.");
51 
52         gatewayaddress address = gatewayservice.getbyhostandport(host, port);
53         if (address == null) { //这里的逻辑可以改,找不到网关配置信息可以指定默认的负载策略
54             logger.error(string.format("must be config a gateway info for the server[%s:%s].", host, string.valueof(port)));
55             return null;
56         }
57 
58         loadbalancer balancer = loadbalancerfactory.build(address.getfkstrategyid());
59 
60         return balancer.chooseserver(loadbalancer);
61     }
62 
63     static class loadbalancerfactory {
64 
65         public static loadbalancer build(string strategy) {
66             gatewayaddress.strategytype type = gatewayaddress.strategytype.of(strategy);
67             switch (type) {
68                 case round:
69                     return new roundloadbalancer();
70                 case random:
71                     return new randomloadbalancer();
72                 default:
73                     return null;
74             }
75         }
76     }
77 }

然后是负载算法接口代码。 
loadbalancer.java

 1 package com.syher.zuul.core.ribbon.balancer;
 2 
 3 import com.netflix.loadbalancer.iloadbalancer;
 4 import com.netflix.loadbalancer.server;
 5 
 6 /**
 7  * @author braska
 8  * @date 2018/07/06.
 9  **/
10 public interface loadbalancer {
11 
12     /**
13      * choose a loadbalancer
14      * @param loadbalancer
15      * @return
16      */
17     server chooseserver(iloadbalancer loadbalancer);
18 }

定义抽象类,实现loadbalancer接口 
abstractloadbalancer.java

 1 package com.syher.zuul.core.ribbon.balancer;
 2 
 3 import com.netflix.loadbalancer.iloadbalancer;
 4 import com.netflix.loadbalancer.server;
 5 import com.syher.zuul.core.springcontext;
 6 import com.syher.zuul.service.serverservice;
 7 import org.slf4j.logger;
 8 import org.slf4j.loggerfactory;
 9 
10 /**
11  * @author braska
12  * @date 2018/07/06.
13  **/
14 public abstract class abstractloadbalancer implements loadbalancer {
15     private static final logger logger = loggerfactory.getlogger(abstractloadbalancer.class);
16     protected serverservice serverservice;
17 
18     @override
19     public server chooseserver(iloadbalancer loadbalancer) {
20         this.serverservice = springcontext.getbean(serverservice.class);
21         server server = choose(loadbalancer);
22         if (server != null) {
23             logger.info(string.format("the server[%s:%s] has been select.", server.gethost(), server.getport()));
24         } else {
25             logger.error("could not find any server.");
26         }
27         return server;
28     }
29 
30     public abstract server choose(iloadbalancer loadbalancer);
31 }

轮询负载算法 
roundloadbalancer.java

 1 package com.syher.zuul.core.ribbon.balancer;
 2 
 3 import com.netflix.loadbalancer.iloadbalancer;
 4 import com.netflix.loadbalancer.server;
 5 import com.syher.zuul.common.constant;
 6 import com.syher.zuul.core.globalcache;
 7 import com.syher.zuul.core.ribbon.loadbalancerruleutil;
 8 import com.syher.zuul.entity.serveraddress;
 9 
10 import java.util.list;
11 
12 /**
13  * 权重轮询
14  * 首次使用取最大权重的服务器。而后通过权重的不断递减,寻找适合的服务器。
15  * @author braska
16  * @date 2018/07/06.
17  **/
18 public class roundloadbalancer extends abstractloadbalancer {
19 
20     private integer currentserver;
21     private integer currentweight;
22     private integer maxweight;
23     private integer gcdweight;
24 
25     @override
26     public server choose(iloadbalancer loadbalancer) {
27         list<serveraddress> addresslist = serverservice.getavailableserver();
28         if (addresslist != null && !addresslist.isempty()) {
29             maxweight = loadbalancerruleutil.getmaxweightforservers(addresslist);
30             gcdweight = loadbalancerruleutil.getgcdforservers(addresslist);
31             currentserver = integer.parseint(globalcache.instance().getordefault(constant.current_server_key, -1).tostring());
32             currentweight = integer.parseint(globalcache.instance().getordefault(constant.current_weight_key, 0).tostring());
33 
34             integer servercount = addresslist.size();
35 
36             if (1 == servercount) {
37                 return new server(addresslist.get(0).gethost(), addresslist.get(0).getport());
38             } else {
39                 while (true) {
40                     currentserver = (currentserver + 1) % servercount;
41                     if (currentserver == 0) {
42                         currentweight = currentweight - gcdweight;
43                         if (currentweight <= 0) {
44                             currentweight = maxweight;
45                             if (currentweight == 0) {
46                                 globalcache.instance().put(constant.current_server_key, currentserver);
47                                 globalcache.instance().put(constant.current_weight_key, currentweight);
48                                 thread.yield();
49                                 return null;
50                             }
51                         }
52                     }
53 
54                     serveraddress address = addresslist.get(currentserver);
55                     if (address.getweight() >= currentweight) {
56                         globalcache.instance().put(constant.current_server_key, currentserver);
57                         globalcache.instance().put(constant.current_weight_key, currentweight);
58                         return new server(address.gethost(), address.getport());
59                     }
60                 }
61             }
62 
63         }
64         return null;
65     }
66 }

最后,serverloadbalancerrule交由spring托管。

至此,springboot+zuul实现自定义过滤器、动态路由、动态负载就都完成了。 
源码:

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网