当前位置: 移动技术网 > IT编程>软件设计>架构 > dubbo+zipkin调用链监控(二)

dubbo+zipkin调用链监控(二)

2018年10月08日  | 移动技术网IT编程  | 我要评论

去年的时候写过,最近看到zipkin2配合brave实现起来会比我之前的实现要简单很多,因为brave将很多交互的内容都封装起来了,不需要自己去写具体的实现,比如如何去构建span,如何去上报数据。

收集器抽象

由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象。

abstractzipkincollectorconfiguration

主要是针对下面两种收集方式的一些配置上的定义,最核心的是sender接口的定义,http与kafka是两类完全不同的实现。

public abstract sender getsender();

其次是协助性的构造函数,主要是配合构建收集器所需要的一些参数。

  • zipkinurl

如果是http收集,那么对应的是zipkin api域名,如果是kafka,对应的是kafka集群的地址

  • topic

仅在收集方式为kafka是有效,http时传空值即可。

public abstractzipkincollectorconfiguration(string servicename,string zipkinurl,string topic){
    this.zipkinurl=zipkinurl;
    this.servicename=servicename;
    this.topic=topic;
    this.tracing=this.tracing();
}

配置上报方式,这里统一采用异常上传,并且配置上报的超时时间。

protected asyncreporter<span> spanreporter() {
    return asyncreporter
            .builder(getsender())
            .closetimeout(500, timeunit.milliseconds)
            .build(spanbytesencoder.json_v2);
}

下面这两方法,是配合应用构建span使用的。

注意那个sampler()方法,默认是什么也不做的意思,我们要想看到数据就需要配置成sampler.always_sample,这样才能真正将数据上报到zipkin服务器。

protected tracing tracing() {
    this.tracing= tracing
            .newbuilder()
            .localservicename(this.servicename)
            .sampler(sampler.always_sample)
            .spanreporter(spanreporter())
            .build();
    return this.tracing;
}

protected tracing gettracing(){
    return this.tracing;
}

httpzipkincollectorconfiguration

主要是实现getsender方法,可以借用okhttpsender这个对象来快速构建,api版本采用v2。

public class httpzipkincollectorconfiguration extends abstractzipkincollectorconfiguration {
    public httpzipkincollectorconfiguration(string servicename,string zipkinurl) {
        super(servicename,zipkinurl,null);
    }

    @override
    public sender getsender() {
        return okhttpsender.create(super.getzipkinurl()+"/api/v2/spans");
    }
}

okhttpsender这个类需要引用这个包

<dependency>
    <groupid>io.zipkin.reporter2</groupid>
    <artifactid>zipkin-sender-okhttp3</artifactid>
    <version>${zipkin-reporter2.version}</version>
</dependency>

kafkazipkincollectorconfiguration

同样也是实现getsender方法

public class kafkazipkincollectorconfiguration extends abstractzipkincollectorconfiguration {
    public kafkazipkincollectorconfiguration(string servicename,string zipkinurl,string topic) {
        super(servicename,zipkinurl,topic);
    }

    @override
    public sender getsender() {

        return kafkasender
                .newbuilder()
                .bootstrapservers(super.getzipkinurl())
                .topic(super.gettopic())
                .encoding(encoding.json)
                .build();
    }
}

kafkasender这个类需要引用这个包:

<dependency>
    <groupid>io.zipkin.reporter2</groupid>
    <artifactid>zipkin-sender-kafka11</artifactid>
    <version>${zipkin-reporter2.version}</version>
</dependency>

收集器工厂

由于上面创建了两个收集器配置类,使用时只能是其中之一,所以实际运行的实例需要根据配置来动态生成。zipkincollectorconfigurationfactory就是负责生成收集器实例的。

private final abstractzipkincollectorconfiguration zipkincollectorconfiguration;

@autowired
public zipkincollectorconfigurationfactory(traceconfig traceconfig){
    if(objects.equal("kafka", traceconfig.getzipkinsendtype())){
        zipkincollectorconfiguration=new kafkazipkincollectorconfiguration(
                traceconfig.getapplicationname(),
                traceconfig.getzipkinurl(),
                traceconfig.getzipkinkafkatopic());
    }
    else {
        zipkincollectorconfiguration = new httpzipkincollectorconfiguration(
                traceconfig.getapplicationname(),
                traceconfig.getzipkinurl());
    }
}

通过构建函数将我们的配置类traceconfig注入进来,然后根据发送方式来构建实例。另外提供一个辅助函数:

public tracing gettracing(){
    return this.zipkincollectorconfiguration.gettracing();
}

过滤器

在dubbo的过滤器中实现数据上传的功能逻辑相对简单,一般都在invoke方法执行前记录数据,然后方法执行完成后再次记录数据。这个逻辑不变,有变化的是数据上报的实现,上一个版本是通过发http请求实现需要编码,现在可以直接借用brave所提供的span来帮助我们完成,有两重要的方法:

  • finish

方法源码如下,在完成的时候会填写上完成的时间并上报数据,这一般应用于同步调用场景。

public void finish(tracecontext context, long finishtimestamp) {
    mutablespan span = this.spanmap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish(long.valueof(finishtimestamp));
            this.reporter.report(span.tospan());
        }
    }
}
  • flush 与上面finish方法的不同点在于,在报数据时没有完成时间,这应该是适用于一些异步调用但不关心结果的场景,比如dubbo所提供的oneway方式调用。
public void flush(tracecontext context) {
    mutablespan span = this.spanmap.remove(context);
    if(span != null && !this.noop.get()) {
        synchronized(span) {
            span.finish((long)null);
            this.reporter.report(span.tospan());
        }
    }
}

消费者

做为消费方,有一个核心功能就是将traceid以及spanid传递到服务提供方,这里还是通过dubbo提供的附加参数方式实现。

@override
public result invoke(invoker<?> invoker, invocation invocation) throws rpcexception {
    if(!rpctracecontext.gettraceconfig().isenabled()){
        return invoker.invoke(invocation);
    }

    zipkincollectorconfigurationfactory zipkincollectorconfigurationfactory=
            springcontextutils.getapplicationcontext().getbean(zipkincollectorconfigurationfactory.class);
    tracer tracer= zipkincollectorconfigurationfactory.gettracing().tracer();

    if(null==rpctracecontext.gettraceid()){
        rpctracecontext.start();
        rpctracecontext.settraceid(idutils.get());
        rpctracecontext.setparentid(null);
        rpctracecontext.setspanid(idutils.get());
    }
    else {
        rpctracecontext.setparentid(rpctracecontext.getspanid());
        rpctracecontext.setspanid(idutils.get());
    }
    tracecontext tracecontext= tracecontext.newbuilder()
            .traceid(rpctracecontext.gettraceid())
            .parentid(rpctracecontext.getparentid())
            .spanid(rpctracecontext.getspanid())
            .sampled(true)
            .build();

    span span=tracer.tospan(tracecontext).start();

    invocation.getattachments().put(rpctracecontext.trace_id_key, string.valueof(span.context().traceid()));
    invocation.getattachments().put(rpctracecontext.span_id_key, string.valueof(span.context().spanid()));

    result result = invoker.invoke(invocation);

    span.finish();

    return result;
}

提供者

@override
    public result invoke(invoker<?> invoker, invocation invocation) throws rpcexception {
        if(!rpctracecontext.gettraceconfig().isenabled()){
            return invoker.invoke(invocation);
        }

        map<string, string> attaches = invocation.getattachments();
        if (!attaches.containskey(rpctracecontext.trace_id_key)){
            return invoker.invoke(invocation);
        }

        long traceid = long.valueof(attaches.get(rpctracecontext.trace_id_key));
        long spanid = long.valueof(attaches.get(rpctracecontext.span_id_key));

        attaches.remove(rpctracecontext.trace_id_key);
        attaches.remove(rpctracecontext.span_id_key);
        rpctracecontext.start();
        rpctracecontext.settraceid(traceid);
        rpctracecontext.setparentid(spanid);
        rpctracecontext.setspanid(idutils.get());

        zipkincollectorconfigurationfactory zipkincollectorconfigurationfactory=
                springcontextutils.getapplicationcontext().getbean(zipkincollectorconfigurationfactory.class);
        tracer tracer= zipkincollectorconfigurationfactory.gettracing().tracer();

        tracecontext tracecontext= tracecontext.newbuilder()
                .traceid(rpctracecontext.gettraceid())
                .parentid(rpctracecontext.getparentid())
                .spanid(rpctracecontext.getspanid())
                .sampled(true)
                .build();
        span span = tracer.tospan(tracecontext).start();

        result result = invoker.invoke(invocation);

        span.finish();

        return result;

    }

异常流程

上面无论是消费者的过滤器还是服务提供者的过滤器,均未考虑服务在调用invoker.invoke时出错的场景,如果出错,后面的span.finish方法将不会按预期执行,也就记录不了信息。所以需要针对此问题做优化:可以在finally块中执行finish方法。

try {
    result = invoker.invoke(invocation);
}
finally {
    span.finish();
}

消费者在调用服务时,异步调用问题

上面过滤器中调用span.finish都是基于同步模式,而由于dubbo除了同步调用外还提供了两种调用方式

  • 异步调用 通过callback机制的异步

  • oneway

只发起请求并不等待结果的异步调用,无callback一说

针对上面两类异步再加上同步调用,我们要想准确记录服务真正的时间,需要在消费方的过滤器中做如下处理:

创建一个用于回调的处理类,它的主要目的是为了在回调成功时记录时间,这里无论是成功还是失败。

private class asyncspancallback implements responsecallback{

    private span span;

    public asyncspancallback(span span){
        this.span=span;
    }

    @override
    public void done(object o) {
        span.finish();
    }

    @override
    public void caught(throwable throwable) {
        span.finish();
    }
}

再在调用invoke方法时,如果是oneway方式,则调用flush方法结果,如果是同步则直接调用finish方法,如果是异步则在回调时调用finish方法。

result result = null;
boolean isoneway = rpcutils.isoneway(invoker.geturl(), invocation);
try {
    result = invoker.invoke(invocation);
}
finally {
    if(isoneway) {
        span.flush();
    }
    else if(!isasync) {
        span.finish();
    }
}

待完善问题

过滤器中生成span的方式应该有更好的方法,还没有对brave做过多研究,后续想办法再优化下。另外我测试的场景是consumer调用provider,provider内部再调用provider2,我测试时发现第三步调用传递的parentid好像有点小问题,后续需要再确认下。

代码下载

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网