Spring Cloud Zipkins设计
1. Trace资源存储
针对Spring Cloud来说,整个系统的Trace信息是存储在HttpZipkinSpanReporter
中,通过LinkedBlockingQueue存储系统的数据,设置数量为1000,但是并不是使用的阻塞方法。采用的是如果容器满了,就丢弃该数据的方法。从线程安全方面来说,LinkedBlockingQueue的offer采用了单机锁机制,所以大并发下会有一定影响,只不过仅仅是插入一条数据,理论上来说影响不会太大。可以使用ConcurrentLinkedQueue进行优化
private final BlockingQueue<Span> pending = new LinkedBlockingQueue<>(1000);
@Override
public void report(Span span) {
this.spanMetricReporter.incrementAcceptedSpans(1);
//插入数据
if (!this.pending.offer(span)) {
this.spanMetricReporter.incrementDroppedSpans(1);
}
}
2. Trace资源的上报
Spring cloud定义了一个Flusher,默认是一秒进行一次上报数据,通过http json的方式传输数据到ZipKin中,针对于统计数据来说其实是允许丢失的,但是在网络不正常的情况,或者大量资源被占用了,会导致重要的Trace数据丢失,无法分析真正的原因。
static final class Flusher implements Runnable {
final Flushable flushable;
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Flusher(Flushable flushable, int flushInterval) {
this.flushable = flushable;
this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
}
@Override
public void run() {
try {
this.flushable.flush();
}
catch (IOException ignored) {
}
}
}
void postSpans(byte[] json) throws IOException {
// intentionally not closing the connection, so as to use keep-alives
HttpURLConnection connection = (HttpURLConnection) new URL(this.url).openConnection();
connection.setRequestMethod("POST");
connection.addRequestProperty("Content-Type", "application/json");
if (this.compressionEnabled) {
connection.addRequestProperty("Content-Encoding", "gzip");
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
compressor.write(json);
}
json = gzipped.toByteArray();
}
connection.setDoOutput(true);
connection.setFixedLengthStreamingMode(json.length);
connection.getOutputStream().write(json);
try (InputStream in = connection.getInputStream()) {
while (in.read() != -1); // skip
}
catch (IOException e) {
try (InputStream err = connection.getErrorStream()) {
if (err != null) { // possible, if the connection was dropped
while (err.read() != -1); // skip
}
}
throw e;
}
}