当前位置: 移动技术网 > IT编程>脚本编程>Go语言 > go微服务框架kratos学习笔记六(kratos 服务发现 discovery)

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)

2020年01月14日  | 移动技术网IT编程  | 我要评论

目录

go微服务框架kratos学习笔记六(kratos 服务发现 discovery)


除了上次的warden直连方式外,kratos有另一个服务发现sdk :

discovery 可以先简单理解为一个http服务、

它最简单的发现过程可能是这样的:

1、service 向discovery 服务注册 appid
2、client 通过 appid 从discovery 查询 service 的addr

当然 远不止这么简单,还包含了很多功能在里面的,例如服务自发现负载均衡

本节仅先看个最简单的服务发现的demo

首先走一遍discovery的http的api

http api

// innerrouter init local router api path.
func innerrouter(e *bm.engine) {
    group := e.group("/discovery")
    {
        group.post("/register", register)
        group.post("/renew", renew)
        group.post("/cancel", cancel)
        group.get("/fetch/all", initprotect, fetchall)
        group.get("/fetch", initprotect, fetch)
        group.get("/fetchs", initprotect, fetchs)
        group.get("/poll", initprotect, poll)
        group.get("/polls", initprotect, polls)
        //manager
        group.post("/set", set)
        group.get("/nodes", initprotect, nodes)
    }
}

discovery里面的bm引擎注册了这些接口, 接着我用postman 测了测。

register 服务注册

fetch 获取实例

fetchs 批量获取实例

polls 批量获取实例

nodes 批量获取节点

renew 心跳

post http://host/discovery/renew

curl 'http://127.0.0.1:7171/discovery/renew' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"

*****成功*****
{
    "code":0,
    "message":""
}
****失败****
{
    "code":-400,
    "message":"-400"
}

cancel 下线

post http://host/discovery/cancel

curl 'http://127.0.0.1:7171/discovery/cancel' -d "zone=sh1&env=test&appid=provider&hostname=myhostname"

*****成功*****
{
    "code":0,
    "message":""
}
****失败****
{
    "code":-400,
    "message":"-400"
}

应用发现逻辑

官方应用发现实现逻辑

选择可用的节点,将应用appid加入poll的appid列表
如果polls请求返回err,则切换node节点,切换逻辑与自发现错误时切换逻辑一致
如果polls返回-304 ,说明appid无变更,重新发起poll监听变更
polls接口返回appid的instances列表,完成服务发现,根据需要选择不同的负载均衡算法进行节点的调度

服务注册

服务注册demo

直接new一个demo服务然后将demo服务注册到discovery

主函数里面服务注册部分添加类似下面注册代码。

    ip := "127.0.0.1"
    port := "9000"
    hn, _ := os.hostname()
    dis := discovery.new(nil)
    ins := &naming.instance{
        zone:     env.zone,
        env:      env.deployenv,
        appid:    "demo.service",
        hostname: hn,
        addrs: []string{
            "grpc://" + ip + ":" + port,
        },
    }

    cancel, err := dis.register(context.background(), ins)
    if err != nil {
        panic(err)
    }

    defer cancel()

panic 找不到节点,这个是我们discovery的节点地址 可以在环境变量里面添加。

i:\vsproject\kratos-note\kratos-note\warden\discovery\server>kratos run
info 01/04-19:32:28.198 i:/vsproject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000
panic: invalid discovery config nodes:[] region:region01 zone:zone01 deployenv:dev host:desktop-nuekd5o

配置discovery节点后成功注册

i:\vsproject\kratos-note\kratos-note\warden\discovery\server>set discovery_nodes=127.0.0.1:7171

i:\vsproject\kratos-note\kratos-note\warden\discovery\server>kratos run
info 01/04-19:40:25.426 i:/vsproject/kratos-note/kratos-note/warden/discovery/server/cmd/main.go:23 abc start
2020/01/04 19:40:25 start watch filepath: i:\vsproject\kratos-note\kratos-note\warden\discovery\server\configs
info 01/04-19:40:25.497 i:/vsproject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 0.0.0.0:8000
[warden] config is deprecated, argument will be ignored. please use -grpc flag or grpc env to configure warden server.
info 01/04-19:40:25.500 i:/vsproject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/net/rpc/warden/server.go:329 warden: start grpc listen addr: [::]:9000
info 01/04-19:40:25.501 i:/vsproject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:248 disocvery: addwatch(infra.discovery) already watch(false)
info 01/04-19:40:25.514 i:/vsproject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:631 discovery: successfully polls(http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=desktop-nuekd5o&latest_timestamp=0) instances ({"infra.discovery":{"instances":{"sh001":[{"region":"sh","zone":"sh001","env":"dev","appid":"infra.discovery","hostname":"test1","addrs":["http://127.0.0.1:7171"],"version":"","latest_timestamp":1578122538945305700,"metadata":null,"status":1}]},"latest_timestamp":1578122538945305700,"scheduler":null}})
info 01/04-19:40:25.527 i:/vsproject/go/pkg/mod/github.com/bilibili/kratos@v0.3.2-0.20191224125553-6e1180f53a8e/pkg/naming/discovery/discovery.go:414 discovery: register client.get(http://127.0.0.1:7171/discovery/register) env(dev) appid(demo.service) addrs([grpc://127.0.0.1:9000]) success

服务注册逻辑

现在我们跟着日志走一遍。

如图理解,服务注册逻辑应该是register -> renew ->cancel 注册 然后 不停给心跳 最后取消注册。

截取一条本地服务注册日志

操作大概是:

1、启动discovery服务
2、启动demo.server 注册demo.server appid 服务
3、过一小会等待心跳,关闭demo.server

接着可以看到整个日志的过程大致上是 :

1、 0 : 启动dicovery服务
2、 2/3/4 : 服务初始化
3、 5 : polls 长轮循 infra.discovery 服务自发现
4、 6/7: 新的连接 & 服务注册、这时候我们起动的demo.server服务注册上来了
5、 9 : polls 长轮循 infra.discovery 服务自发现
6、 10 : renew心跳
7、 12 : 最后我杀掉了注册的服务,出现了cancel请求。

从日志看逻辑理解基本上也没有太多偏差,接着看看服务发现。

0:discovery -conf discovery-example.toml -log.v=0
1:
2:info 01/10-10:31:19.575 c:/server/src/go/src/discovery/discovery/syncup.go:160 discovery 3:changed nodes:[127.0.0.1:7171] zones:map[]
4:info 01/10-10:31:19.575 c:/server/src/go/pkg/mod/github.com/bilibili/kratos@v0.1.0/pkg/net/http/blademaster/server.go:98 blademaster: start http listen addr: 127.0.0.1:7171
info 01/10-10:31:19.575 c:/server/src/go/src/discovery/registry/registry.go:219 polls from(test1) new connection(1)

5:info 01/10-10:31:31.796 http-access-log ts=0 method=get ip=127.0.0.1 traceid= user=no_user params=appid=infra.discovery&env=dev&hostname=desktop-9nfhkd0&latest_timestamp=0 msg=0 stack=<nil> err= timeout_quota=39.98 path=/discovery/polls ret=0

6:info 01/10-10:31:31.798 c:/server/src/go/src/discovery/registry/registry.go:219 polls from(desktop-9nfhkd0) new connection(1)

7:info 01/10-10:31:31.799 http-access-log method=post user=no_user path=/discovery/register err= ts=0 params=addrs=grpc%3a%2f%2f127.0.0.1%3a9000&appid=demo.service&env=dev&hostname=desktop-9nfhkd0&metadata=&region=region01&status=1&version=&zone=zone01 stack=<nil> ret=0 timeout_quota=39.98 ip=127.0.0.1 msg=0 traceid=

8:info 01/10-10:32:01.799 c:/server/src/go/src/discovery/registry/registry.go:370 delconns from(desktop-9nfhkd0) delete(1)

9:error 01/10-10:32:01.799 http-access-log method=get ip=127.0.0.1 err=-304 timeout_quota=39.98 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=desktop-9nfhkd0&latest_timestamp=1578623479566211700 ret=-304 msg=-304 stack=-304 ts=30.0011342 traceid=

10:info 01/10-10:32:01.799 http-access-log msg=0 err= timeout_quota=39.98 method=post ip=127.0.0.1 user=no_user ret=0 path=/discovery/renew traceid= params=appid=demo.service&env=dev&hostname=desktop-9nfhkd0&region=region01&zone=zone01 stack=<nil> ts=0

11:info 01/10-10:32:01.800 c:/server/src/go/src/discovery/registry/registry.go:219 polls from(desktop-9nfhkd0) new connection(1)

12:info 01/10-10:32:08.499 http-access-log timeout_quota=39.98 path=/discovery/cancel ret=0 stack=<nil> ip=127.0.0.1 msg=0 traceid= ts=0 method=post user=no_user err= params=appid=demo.service&env=dev&hostname=desktop-9nfhkd0&region=region01&zone=zone01

服务发现

同样先配置discovert节点 set discovery_nodes=127.0.0.1:7171

newclient()改成如下方式

package dao

import (
    "context"

    "github.com/bilibili/kratos/pkg/naming/discovery"
    "github.com/bilibili/kratos/pkg/net/rpc/warden"
    "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"

    "google.golang.org/grpc"
)

// appid your appid, ensure unique.
const appid = "demo.service" // note: example

func init(){
    // note: 注意这段代码,表示要使用discovery进行服务发现
    // note: 还需注意的是,resolver.register是全局生效的,所以建议该代码放在进程初始化的时候执行
    // note: !!!切记不要在一个进程内进行多个不同中间件的register!!!
    // note: 在启动应用时,可以通过flag(-discovery.nodes) 或者 环境配置(discovery_nodes)指定discovery节点
    resolver.register(discovery.builder())
}

// newclient new member grpc client
func newclient(cfg *warden.clientconfig, opts ...grpc.dialoption) (democlient, error) {
    client := warden.newclient(cfg, opts...)
    conn, err := client.dial(context.background(), "discovery://default/"+appid)
    if err != nil {
        return nil, err
    }
    // 注意替换这里:
    // newdemoclient方法是在"api"目录下代码生成的
    // 对应proto文件内自定义的service名字,请使用正确方法名替换
    return newdemoclient(conn), nil
}

同时嵌入dao结构里面、和上次warden direct方式一样做sayhello接口测试调用。

// dao dao.
type dao struct {
    db          *sql.db
    redis       *redis.redis
    mc          *memcache.memcache
    democlient  demoapi.democlient
    cache *fanout.fanout
    demoexpire int32
}

// new new a dao and return.
func new(r *redis.redis, mc *memcache.memcache, db *sql.db) (d dao, err error) {
    var cfg struct{
        demoexpire xtime.duration
    }
    if err = paladin.get("application.toml").unmarshaltoml(&cfg); err != nil {
        return
    }
    
    grpccfg := &warden.clientconfig{
        dial:              xtime.duration(time.second * 10),
        timeout:           xtime.duration(time.millisecond * 250),
        subset:            50,
        keepaliveinterval: xtime.duration(time.second * 60),
        keepalivetimeout:  xtime.duration(time.second * 20),
    }
    //paladin.get("grpc.toml").unmarshaltoml(grpccfg)
    var grpcclient demoapi.democlient
    grpcclient, err = newclient(grpccfg)

    d = &dao{
        db: db,
        redis: r,
        mc: mc,
        democlient : grpcclient,
        cache: fanout.new("cache"),
        demoexpire: int32(time.duration(cfg.demoexpire) / time.second),
    }
    return
}

测试调用

操作流程

1、启动discovery服务
2、启动demo.server 注册为 demo.server 服务
3、启动demo.client、
4、最后从demo.client的sayhello http接口 调到demo.server的grpc sayhello 接口。

简单看看官方grpc服务发现逻辑

context deadline exceeded

我发现个别时候调用做服务发现,会发现client起不来, context deadline exceeded。

因为我把new client加在了dao里面,超时的话,demo.client就直接pannic了

根据client日志可以发现
warden client: dial discovery://default/demo.service?subset=50 error context deadline exceeded!panic: context deadline exceeded

client : host:127.0.0.1:7171, url:http://127.0.0.1:7171/discovery/polls?appid=infra.discovery&env=dev&hostname=desktop-9nfhkd0&latest_timestamp=1578902420717217500
在调用discovery polls的时候超时了,我配置的grpc dial 期限为10s, 在官方discovery文档介绍中写到discovery在做服务节点自发现的时候,如果server节点实例没有变更,则接口会阻塞直到30s返回-304。(poll(polls) 接口为长轮训接口)

关于服务自发现的话,这里不细看了,本节只关注应用发现逻辑,感兴趣可以去discovery上看看。

info 01/10-15:22:34.436 http-access-log method=get path=/discovery/polls user=no_user params=appid=infra.discovery&env=dev&hostname=clii&latest_timestamp=0 stack=<nil> err= timeout_quota=39.98 ts=0 msg=0 traceid= ip=127.0.0.1 ret=0
info 01/10-15:22:34.438 c:/server/src/go/src/discovery/registry/registry.go:222 polls from(clii) new connection(1)
info 01/10-15:22:34.440 c:/server/src/go/src/discovery/registry/registry.go:228 polls from(clii) reuse connection(2)
info 01/10-15:22:44.219 c:/server/src/go/src/discovery/registry/registry.go:373 delconns from(desktop-9nfhkd0) delete(1)
error 01/10-15:22:44.219 http-access-log path=/discovery/polls ret=-304 msg=-304 timeout_quota=39.98 ip=127.0.0.1 params=appid=infra.discovery&env=dev&hostname=desktop-9nfhkd0&latest_timestamp=1578637331623587200 user=no_user ts=39.9808023 err=-304 traceid= method=get stack=-304
info 01/10-15:22:44.221 c:/server/src/go/src/discovery/registry/registry.go:222 polls from(desktop-9nfhkd0) new connection(1)
info 01/10-15:22:44.525 http-access-log ts=0 method=post ip=127.0.0.1 user=no_user stack=<nil> path=/discovery/renew err= traceid= ret=0 msg=0 timeout_quota=39.98 params=appid=demo.service&env=dev&hostname=desktop-9nfhkd0&region=region01&zone=zone01
info 01/10-15:23:04.438 c:/server/src/go/src/discovery/registry/registry.go:370 delconns from(clii) count decr(2)
error 01/10-15:23:04.438 http-access-log msg=-304 ts=30.0002154 method=get err=-304 stack=-304 timeout_quota=39.98 ip=127.0.0.1 user=no_user path=/discovery/polls params=appid=infra.discovery&env=dev&hostname=clii&latest_timestamp=1578637331623587200 ret=-304 traceid=
info 01/10-15:23:04.440 c:/server/src/go/src/discovery/registry/registry.go:373 delconns from(clii) delete(1)
error 01/10-15:23:04.440 http-access-log ts=30.0013758 traceid= user=no_user path=/discovery/polls ret=-304 err=-304 method=get ip=127.0.0.1 params=appid=infra.discovery&appid=demo.service&env=dev&hostname=clii&latest_timestamp=1578637331623587200&latest_timestamp=0 msg=-304 stack=-304 timeout_quota=39.98

结合discovery 日志
15:22:34的client发dial
15:22:45左右client panic
15:23:04dicovery才回复一个-304 (实例信息无变更)


这实际上是因为 client.dial() 里面封装了grpc官方的服务发现,当然最终走的是kratos warden里面的实现的grpc官方服务发现逻辑。

下面简单看看这层逻辑,很绕,我也没看懂,只能简单看看,有机会接触再补个详细的。

简单看看官方grpc服务发现逻辑

// newclient new grpc client
func newclient(cfg *warden.clientconfig, opts ...grpc.dialoption) (demoapi.democlient, error) {
    client := warden.newclient(cfg, opts...)
    cc, err := client.dial(context.background(), fmt.sprintf("discovery://default/%s", appid))
    if err != nil {
        return nil, err
    }
    return demoapi.newdemoclient(cc), nil
}

实际上 client.dial() 里面会有会有这么一个流程 :

client.dial() - > grpc里面dialcontext() -> parser target 的 scheme 然后获取 (这里是discovery) 对应的builder

    if cc.dopts.resolverbuilder == nil {
        // only try to parse target when resolver builder is not already set.
        cc.parsedtarget = parsetarget(cc.target)
        grpclog.infof("parsed scheme: %q", cc.parsedtarget.scheme)
        cc.dopts.resolverbuilder = resolver.get(cc.parsedtarget.scheme)
        if cc.dopts.resolverbuilder == nil {
            // if resolver builder is still nil, the parsed target's scheme is
            // not registered. fallback to default resolver and set endpoint to
            // the original target.
            grpclog.infof("scheme %q not registered, fallback to default scheme", cc.parsedtarget.scheme)
            cc.parsedtarget = resolver.target{
                scheme:   resolver.getdefaultscheme(),
                endpoint: target,
            }
            cc.dopts.resolverbuilder = resolver.get(cc.parsedtarget.scheme)
        }
    } else {
        cc.parsedtarget = resolver.target{endpoint: target}
    }

dialcontext() 成功会得到 -> 结构体clientconn -> clientconn.resolverwrapper 初始化 -> 调用build()

    defer ccr.resolvermu.unlock()

    ccr.resolver, err = rb.build(cc.parsedtarget, ccr, rbo)
// clientconn represents a virtual connection to a conceptual endpoint, to
// perform rpcs.
//
// a clientconn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. it is also free to determine which actual
// endpoints to use and may change it every rpc, permitting client-side load
// balancing.
//
// a clientconn encapsulates a range of functionality including name
// resolution, tcp connection establishment (with retries and backoff) and tls
// handshakes. it also handles errors on established connections by
// re-resolving the name and reconnecting.
type clientconn struct {
    ctx    context.context
    cancel context.cancelfunc

    target       string
    parsedtarget resolver.target
    authority    string
    dopts        dialoptions
    csmgr        *connectivitystatemanager

    balancerbuildopts balancer.buildoptions
    blockingpicker    *pickerwrapper

    mu              sync.rwmutex
    resolverwrapper *ccresolverwrapper
    sc              *serviceconfig
    conns           map[*addrconn]struct{}
    // keepalive parameter can be updated if a goaway is received.
    mkp             keepalive.clientparameters
    curbalancername string
    balancerwrapper *ccbalancerwrapper
    retrythrottler  atomic.value

    firstresolveevent *grpcsync.event

    channelzid int64 // channelz unique identification number
    czdata     *channelzdata
}

用户builder的实现进行updatestate —> clientconn的updateresolverstate -> updateresolverstate -> address初始化等grpc官方逻辑

// builder creates a resolver that will be used to watch name resolution updates.
type builder interface {
    // build creates a new resolver for the given target.
    //
    // grpc dial calls build synchronously, and fails if the returned error is
    // not nil.
    build(target target, cc clientconn, opts buildoptions) (resolver, error)
    // scheme returns the scheme supported by this resolver.
    // scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
    scheme() string
}
// clientconn contains the callbacks for resolver to notify any updates
// to the grpc clientconn.
//
// this interface is to be implemented by grpc. users should not need a
// brand new implementation of this interface. for the situations like
// testing, the new implementation should embed this interface. this allows
// grpc to add new methods to this interface.
type clientconn interface {
    // updatestate updates the state of the clientconn appropriately.
    updatestate(state)
    // reporterror notifies the clientconn that the resolver encountered an
    // error.  the clientconn will notify the load balancer and begin calling
    // resolvenow on the resolver with exponential backoff.
    reporterror(error)
    // newaddress is called by resolver to notify clientconn a new list
    // of resolved addresses.
    // the address list should be the complete list of resolved addresses.
    //
    // deprecated: use updatestate instead.
    newaddress(addresses []address)
    // newserviceconfig is called by resolver to notify clientconn a new
    // service config. the service config should be provided as a json string.
    //
    // deprecated: use updatestate instead.
    newserviceconfig(serviceconfig string)
    // parseserviceconfig parses the provided service config and returns an
    // object that provides the parsed config.
    parseserviceconfig(serviceconfigjson string) *serviceconfig.parseresult
}

kratos discovery

warden包装了grpc的整个服务发现实现逻辑,代码分别位于pkg/naming/naming.go和warden/resolver/resolver.go中

naming.go定义了用于描述业务实例的instance结构、用于服务注册的registry接口、用于服务发现的resolver接口。

// resolver resolve naming service
type resolver interface {
    fetch(context.context) (*instancesinfo, bool)
    watch() <-chan struct{}
    close() error
}

// registry register an instance and renew automatically.
type registry interface {
    register(ctx context.context, ins *instance) (cancel context.cancelfunc, err error)
    close() error
}

// instancesinfo instance info.
type instancesinfo struct {
    instances map[string][]*instance `json:"instances"`
    lastts    int64                  `json:"latest_timestamp"`
    scheduler *scheduler             `json:"scheduler"`
}

resolver.go内实现了grpc官方的resolver.builderresolver.resolver接口,同时也暴露了naming.go内的naming.buildernaming.resolver接口

// resolver resolve naming service
type resolver interface {
    fetch(context.context) (*instancesinfo, bool)
    watch() <-chan struct{}
    close() error
}

// builder resolver builder.
type builder interface {
    build(id string) resolver
    scheme() string
}

kratos对grpc的build做了包装,只需要传对应的服务的appid即可:warden/resolver/resolver.go在grpc进行调用后,会根据scheme方法查询对应的naming.builder实现并调用build将id传入。而实现naming.resolver即可通过appid去对应的服务发现中间件(这里是discovery服务)进行实例信息的查询(fetch接口)、除了简单进行fetch操作外还多了watch方法,用于监听服务发现中间件的节点变化情况,能够实时的进行服务实例信息的更新。

在naming/discovery内实现了基于discovery为中间件的服务注册与发现逻辑。大致上也可以在这里面看到做了对discovery服务中间件的polls请求。

// build disovery resovler builder.
func (d *discovery) build(appid string, opts ...naming.buildopt) naming.resolver {
    r := &resolve{
        id:    appid,
        d:     d,
        event: make(chan struct{}, 1),
        opt:   new(naming.buildoptions),
    }
    for _, opt := range opts {
        opt.apply(r.opt)
    }
    d.mutex.lock()
    app, ok := d.apps[appid]
    if !ok {
        app = &appinfo{
            resolver: make(map[*resolve]struct{}),
        }
        d.apps[appid] = app
        cancel := d.cancelpolls
        if cancel != nil {
            cancel()
        }
    }
    app.resolver[r] = struct{}{}
    d.mutex.unlock()
    if ok {
        select {
        case r.event <- struct{}{}:
        default:
        }
    }
    log.info("disocvery: addwatch(%s) already watch(%v)", appid, ok)
    d.once.do(func() {
        go d.serverproc()
    })
    return r
}

func (d *discovery) serverproc() {
    var (
        retry  int
        ctx    context.context
        cancel context.cancelfunc
    )
    ticker := time.newticker(time.minute * 30)
    defer ticker.stop()
    for {
        if ctx == nil {
            ctx, cancel = context.withcancel(d.ctx)
            d.mutex.lock()
            d.cancelpolls = cancel
            d.mutex.unlock()
        }
        select {
        case <-d.ctx.done():
            return
        case <-ticker.c:
            d.switchnode()
        default:
        }
        apps, err := d.polls(ctx)
        if err != nil {
            d.switchnode()
            if ctx.err() == context.canceled {
                ctx = nil
                continue
            }
            time.sleep(time.second)
            retry++
            continue
        }
        retry = 0
        d.broadcast(apps)
    }
}

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网