阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

分布式跟踪系统Zipkin详解

477次阅读
没有评论

共计 20763 个字符,预计需要花费 52 分钟才能阅读完成。

zipkin

分布式跟踪系统 Zipkin 详解

zipkin为分布式链路调用监控系统,聚合各业务系统调用延迟数据,达到链路调用监控跟踪。

architecture

分布式跟踪系统 Zipkin 详解
如图,在复杂的调用链路中假设存在一条调用链路响应缓慢,如何定位其中延迟高的服务呢?

  • 日志:通过分析调用链路上的每个服务日志得到结果
  • zipkin:使用 zipkinweb UI可以一眼看出延迟高的服务

分布式跟踪系统 Zipkin 详解

如图所示,各业务系统在彼此调用时,将特定的跟踪消息传递至 zipkin,zipkin 在收集到跟踪信息后将其聚合处理、存储、展示等,用户可通过web UI 方便
获得网络延迟、调用链路、系统依赖等等。

zipkin

zipkin主要涉及四个组件 collector storage search web UI

  • Collector接收各 service 传输的数据
  • Cassandra作为 Storage 的一种,也可以是 mysql 等,默认存储在内存中,配置 cassandra 可以参考这里
  • Query负责查询 Storage 中存储的数据, 提供简单的 JSON API 获取数据,主要提供给 web UI 使用
  • Web 提供简单的 web 界面

install

执行如下命令下载 jar 包

wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'

其为一个spring boot 工程,直接运行 jar

nohup java -jar zipkin.jar & 

访问 http://ip:9411
分布式跟踪系统 Zipkin 详解

terminology

使用 zipkin 涉及几个概念

  • Span: 基本工作单元,一次链路调用 (可以是 RPC,DB 等没有特定的限制) 创建一个 span,通过一个 64 位 ID 标识它,
    span 通过还有其他的数据,例如描述信息,时间戳,key-value 对的 (Annotation)tag 信息,parent-id 等, 其中 parent-id
    可以表示 span 调用链路来源,通俗的理解 span 就是一次请求信息

  • Trace: 类似于树结构的 Span 集合,表示一条调用链路,存在唯一标识

  • Annotation: 注解, 用来记录请求特定事件相关信息(例如时间),通常包含四个注解信息

    cs – Client Start, 表示客户端发起请求

    sr – Server Receive, 表示服务端收到请求

    ss – Server Send, 表示服务端完成处理,并将结果发送给客户端

    cr – Client Received, 表示客户端获取到服务端返回信息

  • BinaryAnnotation: 提供一些额外信息,一般已 key-value 对出现

概念说完,来看下完整的调用链路
分布式跟踪系统 Zipkin 详解

上图表示一请求链路,一条链路通过 Trace Id 唯一标识,Span标识发起的请求信息,各 span 通过parent id 关联起来,如图
分布式跟踪系统 Zipkin 详解

整个链路的依赖关系如下:
分布式跟踪系统 Zipkin 详解

完成链路调用的记录后,如何来计算调用的延迟呢,这就需要利用 Annotation 信息

分布式跟踪系统 Zipkin 详解

sr-cs 得到请求发出延迟

ss-sr 得到服务端处理延迟

cr-cs 得到真个链路完成延迟

brave

作为各调用链路,只需要负责将指定格式的数据发送给 zipkin 即可,利用 brave 可快捷完成操作。

首先导入 jar 包pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.6.RELEASE</version>
    </parent>



    <!-- https://mvnrepository.com/artifact/io.zipkin.brave/brave-core -->
    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-core</artifactId>
            <version>3.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.zipkin.brave/brave-http -->
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-http</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-spancollector-http</artifactId>
            <version>3.9.0</version>
        </dependency>
        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-web-servlet-filter</artifactId>
            <version>3.9.0</version>
        </dependency>

        <dependency>
            <groupId>io.zipkin.brave</groupId>
            <artifactId>brave-okhttp</artifactId>
            <version>3.9.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.1</version>
        </dependency>

    </dependencies>

利用 spring boot 创建工程

Application.java

package com.lkl.zipkin;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 *
 * Created by liaokailin on 16/7/27.
 */
@SpringBootApplication
public class Application {


    public static void main(String[] args) {SpringApplication app = new SpringApplication(Application.class);
        app.run(args);


    }
}

建立 controller 对外提供服务

HomeController.java

RestController
@RequestMapping("/")
public class HomeController {

    @Autowired
    private OkHttpClient client;

    private  Random random = new Random();

    @RequestMapping("start")
    public String start() throws InterruptedException, IOException {int sleep= random.nextInt(100);
        TimeUnit.MILLISECONDS.sleep(sleep);
        Request request = new Request.Builder().url("http://localhost:9090/foo").get().build();
        Response response = client.newCall(request).execute();
        return "[service1 sleep" + sleep+"ms]" + response.body().toString();
    }

HomeController中利用 OkHttpClient 调用发起 http 请求。在每次发起请求时则需要通过 brave 记录 Span 信息,并异步传递给 zipkin
作为被调用方 (服务端) 也同样需要完成以上操作.

ZipkinConfig.java


package com.lkl.zipkin.config;

import com.github.kristofa.brave.Brave;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.SpanCollector;
import com.github.kristofa.brave.http.DefaultSpanNameProvider;
import com.github.kristofa.brave.http.HttpSpanCollector;
import com.github.kristofa.brave.okhttp.BraveOkHttpRequestResponseInterceptor;
import com.github.kristofa.brave.servlet.BraveServletFilter;
import okhttp3.OkHttpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by liaokailin on 16/7/27.
 */
@Configuration
public class ZipkinConfig {

    @Autowired
    private ZipkinProperties properties;


    @Bean
    public SpanCollector spanCollector() {HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().connectTimeout(properties.getConnectTimeout()).readTimeout(properties.getReadTimeout())
                .compressionEnabled(properties.isCompressionEnabled()).flushInterval(properties.getFlushInterval()).build();
        return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
    }


    @Bean
    public Brave brave(SpanCollector spanCollector){Brave.Builder builder = new Brave.Builder(properties.getServiceName());  // 指定 state
        builder.spanCollector(spanCollector);
        builder.traceSampler(Sampler.ALWAYS_SAMPLE);
        Brave brave = builder.build();
        return brave;
    }

    @Bean
    public BraveServletFilter braveServletFilter(Brave brave){BraveServletFilter filter = new BraveServletFilter(brave.serverRequestInterceptor(),brave.serverResponseInterceptor(),new DefaultSpanNameProvider());
        return filter;
    }

    @Bean
    public OkHttpClient okHttpClient(Brave brave){OkHttpClient client = new OkHttpClient.Builder()
                .addInterceptor(new BraveOkHttpRequestResponseInterceptor(brave.clientRequestInterceptor(), brave.clientResponseInterceptor(), new DefaultSpanNameProvider()))
                .build();
        return client;
    }
}
  • SpanCollector 配置收集器

  • Brave 各工具类的封装, 其中 builder.traceSampler(Sampler.ALWAYS_SAMPLE) 设置采样比率,0- 1 之间的百分比

  • BraveServletFilter 作为拦截器,需要 serverRequestInterceptor,serverResponseInterceptor 分别完成srss操作

  • OkHttpClient 添加拦截器,需要 clientRequestInterceptor,clientResponseInterceptor 分别完成cscr操作, 该功能由
    brave 中的 brave-okhttp 模块提供,同样的道理如果需要记录数据库的延迟只要在数据库操作前后完成 cscr即可,当然 brave 提供其封装。

以上还缺少一个配置信息ZipkinProperties.java

package com.lkl.zipkin.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * Created by liaokailin on 16/7/28.
 */
@Configuration
@ConfigurationProperties(prefix = "com.zipkin")
public class ZipkinProperties {

    private String serviceName;

    private String url;

    private int connectTimeout;

    private int readTimeout;

    private int flushInterval;

    private boolean compressionEnabled;

    public String getUrl() {return url;
    }

    public void setUrl(String url) {this.url = url;
    }

    public int getConnectTimeout() {return connectTimeout;
    }

    public void setConnectTimeout(int connectTimeout) {this.connectTimeout = connectTimeout;
    }

    public int getReadTimeout() {return readTimeout;
    }

    public void setReadTimeout(int readTimeout) {this.readTimeout = readTimeout;
    }

    public int getFlushInterval() {return flushInterval;
    }

    public void setFlushInterval(int flushInterval) {this.flushInterval = flushInterval;
    }

    public boolean isCompressionEnabled() {return compressionEnabled;
    }

    public void setCompressionEnabled(boolean compressionEnabled) {this.compressionEnabled = compressionEnabled;
    }

    public String getServiceName() {return serviceName;
    }

    public void setServiceName(String serviceName) {this.serviceName = serviceName;
    }
}

则可以在配置文件 application.properties 中配置相关信息

com.zipkin.serviceName=service1
com.zipkin.url=http://110.173.14.57:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true
server.port=8080

那么其中的 service1 即完成,同样的道理,修改配置文件 (调整com.zipkin.serviceName, 以及server.port) 以及 controller 对应的方法构造若干服务

service1 中访问 http://localhost:8080/start 需要访问 http://localhost:9090/foo, 则构造server2 提供该方法

server2配置

com.zipkin.serviceName=service2
com.zipkin.url=http://110.173.14.57:9411
com.zipkin.connectTimeout=6000
com.zipkin.readTimeout=6000
com.zipkin.flushInterval=1
com.zipkin.compressionEnabled=true


server.port=9090

controller方法

    @RequestMapping("foo")
    public String foo() throws InterruptedException, IOException {Random random = new Random();
        int sleep= random.nextInt(100);
        TimeUnit.MILLISECONDS.sleep(sleep);
        Request request = new Request.Builder().url("http://localhost:9091/bar").get().build();  //service3
        Response response = client.newCall(request).execute();
        String result = response.body().string();
        request = new Request.Builder().url("http://localhost:9092/tar").get().build();  //service4
        response = client.newCall(request).execute();
       result += response.body().string();
        return "[service2 sleep" + sleep+"ms]" + result;
    }

server2 中调用 server3server4中的方法

方法分别为

 @RequestMapping("bar")
    public String bar() throws InterruptedException, IOException {//service3 method
        Random random = new Random();
        int sleep= random.nextInt(100);
        TimeUnit.MILLISECONDS.sleep(sleep);
        return "[service3 sleep" + sleep+"ms]";
    }

    @RequestMapping("tar")
    public String tar() throws InterruptedException, IOException {//service4 method
        Random random = new Random();
        int sleep= random.nextInt(1000);
        TimeUnit.MILLISECONDS.sleep(sleep);
        return "[service4 sleep" + sleep+"ms]";
    }

将工程修改后编译成 jar 形式

执行


nohup java -jar server4.jar &
nohup java -jar server3.jar &
nohup java -jar server2.jar &
nohup java -jar server1.jar &

访问 http://localhost:8080/start 后查看 zipkinweb UI

分布式跟踪系统 Zipkin 详解

点击条目可以查看具体的延迟信息

分布式跟踪系统 Zipkin 详解

服务之间的依赖为
分布式跟踪系统 Zipkin 详解

brave 源码

以上完成了基本的操作,下面将从源码角度来看下 brave 的实现

首先从 SpanCollector 来入手


 @Bean
    public SpanCollector spanCollector() {HttpSpanCollector.Config config = HttpSpanCollector.Config.builder().connectTimeout(properties.getConnectTimeout()).readTimeout(properties.getReadTimeout())
                .compressionEnabled(properties.isCompressionEnabled()).flushInterval(properties.getFlushInterval()).build();
        return HttpSpanCollector.create(properties.getUrl(), config, new EmptySpanCollectorMetricsHandler());
    }

从名称上看 HttpSpanCollector 是基于 httpspan收集器, 因此超时配置是必须的,默认给出的超时时间较长,flushInterval表示 span 的传递
间隔,实际为定时任务执行的间隔时间. 在 HttpSpanCollector 中覆写了父类方法sendSpans


@Override
  protected void sendSpans(byte[] json) throws IOException {// intentionally not closing the connection, so as to use keep-alives
    HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
    connection.setConnectTimeout(config.connectTimeout());
    connection.setReadTimeout(config.readTimeout());
    connection.setRequestMethod("POST");
    connection.addRequestProperty("Content-Type", "application/json");
    if (config.compressionEnabled()) {connection.addRequestProperty("Content-Encoding", "gzip");
      ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
      try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {compressor.write(json);
      }
      json = gzipped.toByteArray();}
    connection.setDoOutput(true);
    connection.setFixedLengthStreamingMode(json.length);
    connection.getOutputStream().write(json);

    try (InputStream in = connection.getInputStream()) {while (in.read() != -1) ; // skip
    } catch (IOException e) {try (InputStream err = connection.getErrorStream()) {if (err != null) {// possible, if the connection was dropped
          while (err.read() != -1) ; // skip
        }
      }
      throw e;
    }
  }
}

可以看出最终 span 信息是通过 HttpURLConnection 实现的,同样道理就可以推理 bravebrave-spring-resttemplate-interceptors模块的实现,
只是换了一种 http 封装。

Brave

 @Bean
    public Brave brave(SpanCollector spanCollector){Brave.Builder builder = new Brave.Builder(properties.getServiceName());  // 指定 state
        builder.spanCollector(spanCollector);
        builder.traceSampler(Sampler.ALWAYS_SAMPLE);
        Brave brave = builder.build();
        return brave;
    }

Brave类包装了各种工具类

public Brave build() {return new Brave(this);
        }

创建一个Brave


private Brave(Builder builder) {serverTracer = ServerTracer.builder()
                .randomGenerator(builder.random)
                .spanCollector(builder.spanCollector)
                .state(builder.state)
                .traceSampler(builder.sampler).build();

        clientTracer = ClientTracer.builder()
                .randomGenerator(builder.random)
                .spanCollector(builder.spanCollector)
                .state(builder.state)
                .traceSampler(builder.sampler).build();

        localTracer = LocalTracer.builder()
                .randomGenerator(builder.random)
                .spanCollector(builder.spanCollector)
                .spanAndEndpoint(SpanAndEndpoint.LocalSpanAndEndpoint.create(builder.state))
                .traceSampler(builder.sampler).build();

        serverRequestInterceptor = new ServerRequestInterceptor(serverTracer);
        serverResponseInterceptor = new ServerResponseInterceptor(serverTracer);
        clientRequestInterceptor = new ClientRequestInterceptor(clientTracer);
        clientResponseInterceptor = new ClientResponseInterceptor(clientTracer);
        serverSpanAnnotationSubmitter = AnnotationSubmitter.create(SpanAndEndpoint.ServerSpanAndEndpoint.create(builder.state));
        serverSpanThreadBinder = new ServerSpanThreadBinder(builder.state);
        clientSpanThreadBinder = new ClientSpanThreadBinder(builder.state);
    }

封装了 *Tracer,*Interceptor,*Binder

其中 serverTracer当服务作为 服务端 时处理 span 信息,clientTracer当服务作为 客户端 时处理 span 信息

Filter

BraveServletFilterhttp 模块提供的拦截器功能,传递 serverRequestInterceptor,serverResponseInterceptor,spanNameProvider 等参数
其中 spanNameProvider 表示如何处理 span 的名称,默认使用 method 名称,spring boot中申明的filter bean 默认拦截所有请求

@Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException {String alreadyFilteredAttributeName = getAlreadyFilteredAttributeName();
        boolean hasAlreadyFilteredAttribute = request.getAttribute(alreadyFilteredAttributeName) != null;

        if (hasAlreadyFilteredAttribute) {// Proceed without invoking this filter...
            filterChain.doFilter(request, response);
        } else {final StatusExposingServletResponse statusExposingServletResponse = new StatusExposingServletResponse((HttpServletResponse) response);
            requestInterceptor.handle(new HttpServerRequestAdapter(new ServletHttpServerRequest((HttpServletRequest) request), spanNameProvider));

            try {filterChain.doFilter(request, statusExposingServletResponse);
            } finally {responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() {@Override
                    public int getHttpStatusCode() {return statusExposingServletResponse.getStatus();}
                }));
            }
        }
    }

首先来看 requestInterceptor.handle 方法,


 public void handle(ServerRequestAdapter adapter) {serverTracer.clearCurrentSpan();
        final TraceData traceData = adapter.getTraceData();

        Boolean sample = traceData.getSample();
        if (sample != null && Boolean.FALSE.equals(sample)) {serverTracer.setStateNoTracing();
            LOGGER.fine("Received indication that we should NOT trace.");
        } else {if (traceData.getSpanId() != null) {LOGGER.fine("Received span information as part of request.");
                SpanId spanId = traceData.getSpanId();
                serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
                        spanId.nullableParentId(), adapter.getSpanName());
            } else {LOGGER.fine("Received no span state.");
                serverTracer.setStateUnknown(adapter.getSpanName());
            }
            serverTracer.setServerReceived();
            for(KeyValueAnnotation annotation : adapter.requestAnnotations())
            {serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
            }
        }
    }

其中 serverTracer.clearCurrentSpan() 清除当前线程上的 span 信息,调用 ThreadLocalServerClientAndLocalSpanState 中的


  @Override
    public void setCurrentServerSpan(final ServerSpan span) {if (span == null) {currentServerSpan.remove();
        } else {currentServerSpan.set(span);
        }
    }

currentServerSpanThreadLocal 对象

private final static ThreadLocal<ServerSpan> currentServerSpan = new ThreadLocal<ServerSpan>() {

回到 ServerRequestInterceptor#handle() 方法中final TraceData traceData = adapter.getTraceData()

 @Override
    public TraceData getTraceData() {final String sampled = serverRequest.getHttpHeaderValue(BraveHttpHeaders.Sampled.getName());
        if (sampled != null) {if (sampled.equals("0") || sampled.toLowerCase().equals("false")) {return TraceData.builder().sample(false).build();} else {final String parentSpanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.ParentSpanId.getName());
                final String traceId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.TraceId.getName());
                final String spanId = serverRequest.getHttpHeaderValue(BraveHttpHeaders.SpanId.getName());

                if (traceId != null && spanId != null) {SpanId span = getSpanId(traceId, spanId, parentSpanId);
                    return TraceData.builder().sample(true).spanId(span).build();}
            }
        }
        return TraceData.builder().build();
    }

其中 SpanId span = getSpanId(traceId, spanId, parentSpanId) 将构造一个SpanId 对象

 private SpanId getSpanId(String traceId, String spanId, String parentSpanId) {return SpanId.builder()
            .traceId(convertToLong(traceId))
            .spanId(convertToLong(spanId))
            .parentId(parentSpanId == null ? null : convertToLong(parentSpanId)).build();}

traceId,spanId,parentId 关联起来,其中设置 parentId 方法为


public Builder parentId(@Nullable Long parentId) {if (parentId == null) {this.flags |= FLAG_IS_ROOT;
      } else {this.flags &= ~FLAG_IS_ROOT;
      }
      this.parentId = parentId;
      return this;
    }

如果 parentId 为空为根节点,则执行 this.flags |= FLAG_IS_ROOT , 因此后续在判断节点是否为根节点时,只需要执行(flags & FLAG_IS_ROOT) == FLAG_IS_ROOT 即可.

构造完 SpanId 后看

    serverTracer.setStateCurrentTrace(spanId.traceId, spanId.spanId,
                        spanId.nullableParentId(), adapter.getSpanName());

设置当前Span

 public void setStateCurrentTrace(long traceId, long spanId, @Nullable Long parentSpanId, @Nullable String name) {checkNotBlank(name, "Null or blank span name");
        spanAndEndpoint().state().setCurrentServerSpan(ServerSpan.create(traceId, spanId, parentSpanId, name));
    }

ServerSpan.create创建 Span 信息


 static ServerSpan create(long traceId, long spanId, @Nullable Long parentSpanId, String name) {Span span = new Span();
        span.setTrace_id(traceId);
        span.setId(spanId);
        if (parentSpanId != null) {span.setParent_id(parentSpanId);
        }
        span.setName(name);
        return create(span, true);
    }

构造了一个包含 Span 信息的 AutoValue_ServerSpan 对象

通过 setCurrentServerSpan 设置到当前线程上

继续看 serverTracer.setServerReceived() 方法

public void setServerReceived() {submitStartAnnotation(zipkinCoreConstants.SERVER_RECV);
    }

为当前请求设置了server received event


void submitStartAnnotation(String annotationName) {Span span = spanAndEndpoint().span();
        if (span != null) {
            Annotation annotation = Annotation.create(currentTimeMicroseconds(),
                annotationName,
                spanAndEndpoint().endpoint()
            );
            synchronized (span) {span.setTimestamp(annotation.timestamp);
                span.addToAnnotations(annotation);
            }
        }
    }

在这里为 Span 信息设置了 Annotation 信息, 后续的

 for(KeyValueAnnotation annotation : adapter.requestAnnotations())
            {serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
            }

设置了 BinaryAnnotation 信息,adapter.requestAnnotations()在构造 HttpServerRequestAdapter 时已完成

 @Override
    public Collection<KeyValueAnnotation> requestAnnotations() {
        KeyValueAnnotation uriAnnotation = KeyValueAnnotation.create(TraceKeys.HTTP_URL, serverRequest.getUri().toString());
        return Collections.singleton(uriAnnotation);
    }

以上将 Span 信息 (包括 sr) 存储在当前线程中,接下来继续看 BraveServletFilter#doFilter 方法的 finally 部分


 responseInterceptor.handle(new HttpServerResponseAdapter(new HttpResponse() {@Override  // 获取 http 状态码
                    public int getHttpStatusCode() {return statusExposingServletResponse.getStatus();}
                }));

handle方法

 public void handle(ServerResponseAdapter adapter) {// We can submit this in any case. When server state is not set or
        // we should not trace this request nothing will happen.
        LOGGER.fine("Sending server send.");
        try {for(KeyValueAnnotation annotation : adapter.responseAnnotations())
            {serverTracer.submitBinaryAnnotation(annotation.getKey(), annotation.getValue());
            }
            serverTracer.setServerSend();} finally {serverTracer.clearCurrentSpan();
        }
    }

首先配置 BinaryAnnotation 信息,然后执行 serverTracer.setServerSend, 在finally 中清除当前线程中的 Span 信息 (不管前面是否清楚成功, 最终都将执行该不走),ThreadLocal 中的数据要做到有始有终

serverTracer.setServerSend()

public void setServerSend() {if (submitEndAnnotation(zipkinCoreConstants.SERVER_SEND, spanCollector())) {spanAndEndpoint().state().setCurrentServerSpan(null);
        }
    }

终于看到 spanCollector 收集器了,说明下面将看是收集 Span 信息, 这里为 ss 注解


boolean submitEndAnnotation(String annotationName, SpanCollector spanCollector) {Span span = spanAndEndpoint().span();
        if (span == null) {return false;
        }
        Annotation annotation = Annotation.create(currentTimeMicroseconds(),
            annotationName,
            spanAndEndpoint().endpoint()
        );
        span.addToAnnotations(annotation);
        if (span.getTimestamp() != null) {span.setDuration(annotation.timestamp - span.getTimestamp());
        }
        spanCollector.collect(span);
        return true;
    }

首先获取当前线程中的 Span 信息,然后处理注解信息,通过 annotation.timestamp - span.getTimestamp() 计算延迟,
调用 spanCollector.collect(span) 进行收集 Span 信息,那么 Span 信息是同步收集的吗?肯定不是的,接着看

分布式跟踪系统 Zipkin 详解

调用 spanCollector.collect(span) 则执行 FlushingSpanCollector 中的 collect 方法


@Override
  public void collect(Span span) {metrics.incrementAcceptedSpans(1);
    if (!pending.offer(span)) {metrics.incrementDroppedSpans(1);
    }
  }

首先进行的是 metrics 统计信息,可以自定义该 SpanCollectorMetricsHandler 信息收集各指标信息, 利用如 grafana 等展示信息

pending.offer(span)span 信息存储在 BlockingQueue 中,然后通过定时任务去取出阻塞队列中的值,偷偷摸摸的上传 span 信息

定时任务利用了 Flusher 类来执行,在构造 FlushingSpanCollector 时构造了 Flusher


 static final class Flusher implements Runnable {final Flushable flushable;
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    Flusher(Flushable flushable, int flushInterval) {this.flushable = flushable;
      this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
    }

    @Override
    public void run() {try {flushable.flush();
      } catch (IOException ignored) {}}
  }

创建了一个核心线程数为 1 的线程池,每间隔 flushInterval 秒执行一次 Span 信息上传,执行 flush 方法

@Override
  public void flush() {if (pending.isEmpty()) return;
    List<Span> drained = new ArrayList<Span>(pending.size());
    pending.drainTo(drained);
    if (drained.isEmpty()) return;

    int spanCount = drained.size();
    try {reportSpans(drained);
    } catch (IOException e) {metrics.incrementDroppedSpans(spanCount);
    } catch (RuntimeException e) {metrics.incrementDroppedSpans(spanCount);
    }
  }

首先将阻塞队列中的值全部取出存如集合中,最后调用 reportSpans(List<Span> drained) 抽象方法,该方法在 AbstractSpanCollector 得到覆写

@Override
  protected void reportSpans(List<Span> drained) throws IOException {byte[] encoded = codec.writeSpans(drained);
    sendSpans(encoded);
  }

转换成字节流后调用 sendSpans 抽象方法发送 Span 信息,此时就回到一开始说的 HttpSpanCollector 通过 HttpURLConnection 实现的 sendSpans 方法。

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-12/149460.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计20763字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7984119
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare也瘫了连监控都挂,根因藏在哪?

支付宝、淘宝、闲鱼又双叕崩了,Cloudflare 也瘫了连监控都挂,根因藏在哪? 最近两天的互联网堪称“故障...
240 元左右!五盘位 NAS主机,7 代U硬解4K稳如狗,拓展性碾压同价位

240 元左右!五盘位 NAS主机,7 代U硬解4K稳如狗,拓展性碾压同价位

  240 元左右!五盘位 NAS 主机,7 代 U 硬解 4K 稳如狗,拓展性碾压同价位 在 NA...
150元打造低成本NAS小钢炮,捡一块3865U工控板

150元打造低成本NAS小钢炮,捡一块3865U工控板

150 元打造低成本 NAS 小钢炮,捡一块 3865U 工控板 一块二手的熊猫 B3 工控板 3865U,搭...
开源MoneyPrinterTurbo 利用AI大模型,一键生成高清短视频!

开源MoneyPrinterTurbo 利用AI大模型,一键生成高清短视频!

  开源 MoneyPrinterTurbo 利用 AI 大模型,一键生成高清短视频! 在短视频内容...
星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定!

星哥带你玩飞牛 NAS-9:全能网盘搜索工具 13 种云盘一键搞定! 前言 作为 NAS 玩家,你是否总被这些...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换,告别多工具切换

12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换,告别多工具切换

12.2K Star 爆火!开源免费的 FileConverter:右键一键搞定音视频 / 图片 / 文档转换...
手把手教你,购买云服务器并且安装宝塔面板

手把手教你,购买云服务器并且安装宝塔面板

手把手教你,购买云服务器并且安装宝塔面板 前言 大家好,我是星哥。星哥发现很多新手刚接触服务器时,都会被“选购...
每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年 0.99 刀,拿下你的第一个顶级域名,详细注册使用 前言 作为长期折腾云服务、域名建站的老玩家,星哥一直...
安装并使用谷歌AI编程工具Antigravity(亲测有效)

安装并使用谷歌AI编程工具Antigravity(亲测有效)

  安装并使用谷歌 AI 编程工具 Antigravity(亲测有效) 引言 Antigravity...
告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

  告别 Notion 焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁” 引言 在数字笔记工...