Spring Cloud Zipkins设计

1. Trace资源存储

针对Spring Cloud来说,整个系统的Trace信息是存储在HttpZipkinSpanReporter中,通过LinkedBlockingQueue存储系统的数据,设置数量为1000,但是并不是使用的阻塞方法。采用的是如果容器满了,就丢弃该数据的方法。从线程安全方面来说,LinkedBlockingQueue的offer采用了单机锁机制,所以大并发下会有一定影响,只不过仅仅是插入一条数据,理论上来说影响不会太大。可以使用ConcurrentLinkedQueue进行优化

    private final BlockingQueue<Span> pending = new LinkedBlockingQueue<>(1000);
    @Override
    public void report(Span span) {
        this.spanMetricReporter.incrementAcceptedSpans(1);
        //插入数据
        if (!this.pending.offer(span)) {
            this.spanMetricReporter.incrementDroppedSpans(1);
        }
    }

2. Trace资源的上报

Spring cloud定义了一个Flusher,默认是一秒进行一次上报数据,通过http json的方式传输数据到ZipKin中,针对于统计数据来说其实是允许丢失的,但是在网络不正常的情况,或者大量资源被占用了,会导致重要的Trace数据丢失,无法分析真正的原因。

static final class Flusher implements Runnable {
        final Flushable flushable;
        final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        Flusher(Flushable flushable, int flushInterval) {
            this.flushable = flushable;
            this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
        }

        @Override
        public void run() {
            try {
                this.flushable.flush();
            }
            catch (IOException ignored) {
            }
        }
    }

    void postSpans(byte[] json) throws IOException {
        // intentionally not closing the connection, so as to use keep-alives
        HttpURLConnection connection = (HttpURLConnection) new URL(this.url).openConnection();
        connection.setRequestMethod("POST");
        connection.addRequestProperty("Content-Type", "application/json");
        if (this.compressionEnabled) {
            connection.addRequestProperty("Content-Encoding", "gzip");
            ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
            try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
                compressor.write(json);
            }
            json = gzipped.toByteArray();
        }
        connection.setDoOutput(true);
        connection.setFixedLengthStreamingMode(json.length);
        connection.getOutputStream().write(json);

        try (InputStream in = connection.getInputStream()) {
            while (in.read() != -1); // skip
        }
        catch (IOException e) {
            try (InputStream err = connection.getErrorStream()) {
                if (err != null) { // possible, if the connection was dropped
                    while (err.read() != -1); // skip
                }
            }
            throw e;
        }
    }

results matching ""

    No results matching ""