ExecSource 源码分析
@Override
public void start() {
logger.info("Exec source starting with command:{}", command);
//初始化一个单线程的线程池
executor = Executors.newSingleThreadExecutor();
//初始化命令行执行线程
runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);
//执行线程
// FIXME: Use a callback-like executor / future to signal us upon failure.
runnerFuture = executor.submit(runner);
/*
* NB: This comes at the end rather than the beginning of the method because
* it sets our state to running. We want to make sure the executor is alive
* and well first.
*/
//开始计数
sourceCounter.start();
super.start();
logger.debug("Exec source started");
}
具体执行命令的核心逻辑
@Override
public void run() {
do {
String exitCode = "unknown";
BufferedReader reader = null;
String line = null;
final List<Event> eventList = new ArrayList<Event>();
timedFlushService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(
"timedFlushExecService" +
Thread.currentThread().getId() + "-%d").build());
try {
//如果执行的shell不为null
if(shell != null) {
//获取执行的参数
String[] commandArgs = formulateShellCommand(shell, command);
//进行执行
process = Runtime.getRuntime().exec(commandArgs);
} else {
//执行具体的命令
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
}
//对接命令的输入流
reader = new BufferedReader(
new InputStreamReader(process.getInputStream(), charset));
//设置输出error的流
// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(new BufferedReader(
new InputStreamReader(process.getErrorStream(), charset)), logStderr);
stderrReader.setName("StderrReader-[" + command + "]");
stderrReader.setDaemon(true);
stderrReader.start();
//根据batchTimeout 参数 执行延迟任务
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
synchronized (eventList) {
if(!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
},
batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
//这里会阻塞住,文件有新的数据写入的时候会执行
while ((line = reader.readLine()) != null) {
synchronized (eventList) {
sourceCounter.incrementEventReceivedCount();
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if(eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}
//flush 剩下和超时的数据
synchronized (eventList) {
if(!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Failed while running command: " + command, e);
if(e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ex) {
logger.error("Failed to close reader for exec source", ex);
}
}
//关闭执行shell的进程
exitCode = String.valueOf(kill());
}
//判断是否允许重启,如果允许等待时间之后进行重启
if(restart) {
logger.info("Restarting in {}ms, exit code {}", restartThrottle,
exitCode);
try {
Thread.sleep(restartThrottle);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
logger.info("Command [" + command + "] exited with " + exitCode);
}
} while(restart);
}
把数据推到Channel中,这块HttpSource里面已经分析了channelProcessor,这里不在过多介绍
private void flushEventBatch(List<Event> eventList){
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}