HTTPSource
1.初始化过程
HTTPSource是众多Source的一种,如果进行配置使用HTTPSoruce,那么会启动一个基于jetty的http内嵌服务器用来接收Event。大概内容如下。
@Override
public void start() {
Preconditions.checkState(srv == null,
"Running HTTP Server found in source: " + getName()
+ " before I started one."
+ "Will not attempt to start.");
srv = new Server();
// Connector Array
Connector[] connectors = new Connector[1];
//判断是否使用https
if (sslEnabled) {
SslSocketConnector sslSocketConnector = new HTTPSourceSocketConnector(excludedProtocols);
sslSocketConnector.setKeystore(keyStorePath);
sslSocketConnector.setKeyPassword(keyStorePassword);
sslSocketConnector.setReuseAddress(true);
connectors[0] = sslSocketConnector;
} else {
SelectChannelConnector connector = new SelectChannelConnector();
connector.setReuseAddress(true);
connectors[0] = connector;
}
connectors[0].setHost(host);
connectors[0].setPort(port);
srv.setConnectors(connectors);
try {
//启动jetty服务器,并设置FlumeHTTPServlet 接收http请求
org.mortbay.jetty.servlet.Context root =
new org.mortbay.jetty.servlet.Context(
srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
HTTPServerConstraintUtil.enforceConstraints(root);
//启动服务器
srv.start();
Preconditions.checkArgument(srv.getHandler().equals(root));
} catch (Exception ex) {
LOG.error("Error while starting HTTPSource. Exception follows.", ex);
Throwables.propagate(ex);
}
Preconditions.checkArgument(srv.isRunning());
sourceCounter.start();
super.start();
}
2.处理请求过程
HTTPSource记录请求的过程都在`FlumeHTTPServlet
中
private class FlumeHTTPServlet extends HttpServlet {
@Override
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
//创建一个Event空集合
List<Event> events = Collections.emptyList(); //create empty list
try {
// 解析request中解析数据,目前支持两种数据格式 二进制和json
//可以根据配置的Hanlder去解析成对象 默认采用org.apache.flume.source.http.JSONHandler
events = handler.getEvents(request);
} catch (HTTPBadRequestException ex) {
LOG.warn("Received bad request from client. ", ex);
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Bad request from client. "
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Deserializer threw unexpected exception. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Deserializer threw unexpected exception. "
+ ex.getMessage());
return;
}
//记录接收请求的次数+1
sourceCounter.incrementAppendBatchReceivedCount();
//记录接收的数量
sourceCounter.addToEventReceivedCount(events.size());
try {
//对event进行处理
getChannelProcessor().processEventBatch(events);
} catch (ChannelException ex) {
LOG.warn("Error appending event to channel. "
+ "Channel might be full. Consider increasing the channel "
+ "capacity or make sure the sinks perform faster.", ex);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Error appending event to channel. Channel might be full."
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Unexpected error appending event to channel. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Unexpected error while appending event to channel. "
+ ex.getMessage());
return;
}
//设置响应格式
response.setCharacterEncoding(request.getCharacterEncoding());
response.setStatus(HttpServletResponse.SC_OK);
response.flushBuffer();
// 记录实际成功的次数和event的数量
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
}
}
上述逻辑的核心在于getChannelProcessor().processEventBatch(events);
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");
//通过拦截器进行一些列的数据处理
events = interceptorChain.intercept(events);
Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();
for (Event event : events) {
//通过ChannelSelector去选择相应的Channel集合、主要是通过Event 的header进行控制的
List<Channel> reqChannels = selector.getRequiredChannels(event);
for (Channel ch : reqChannels) {
//遍历所有Channel把数据放入到各个Channel中去
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
List<Channel> optChannels = selector.getOptionalChannels(event);
for (Channel ch: optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}
}
// 遍历reqChannelQueue集合
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
//获取Channel实现的事务机制
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
//开启事务
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
//放入Event
for (Event event : batch) {
reqChannel.put(event);
}
//提交事务
tx.commit();
} catch (Throwable t) {
//rollBack
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " +
reqChannel, t);
throw (Error) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
//关闭事务
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
List<Event> batch = optChannelQueue.get(optChannel);
for (Event event : batch ) {
optChannel.put(event);
}
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
上述逻辑主要是数据Source-》Channel中、上述比较核心的是ChannelSelector中获取的两种集合ChannelreqChannelQueue
和optChannelQueue
, Flume支持Event从一个源到多个通道,reqChannelQueue
和optChannelQueue
在使用上面是有差别的。可以先看看下面的配置:
a1.source.r1.selector.mapping=c1 c2 c3
a1.source.r1.selector.optional=c3 c4
mapping 里面映射的Channel会到reqChannelQueue
集合中、而optional映射的集合会到optChannelQueue
,注意看两个里面都有c3,但实际上只有有一个生效,具体代码如下所示:
MultiplexingChannelSelector.java
//遍历配置的optional
for (String hdr : optionalChannelsMapping.keySet()) {
//根据配置的name取出channel
List<Channel> confChannels = getChannelListFromNames(
optionalChannelsMapping.get(hdr), channelNameMap);
//如果是空的就使用默认的Channel
if (confChannels.isEmpty()) {
confChannels = EMPTY_LIST;
}
//Remove channels from optional channels, which are already
//configured to be required channels.
//获取对应的reqdChannels
List<Channel> reqdChannels = channelMapping.get(hdr);
//Check if there are required channels, else defaults to default channels
if(reqdChannels == null || reqdChannels.isEmpty()) {
reqdChannels = defaultChannels;
}
//如果reqdChannels里面已经存在了,那么就移除该Channel
for (Channel c : reqdChannels) {
if (confChannels.contains(c)) {
confChannels.remove(c);
}
}
if (optionalChannels.put(hdr, confChannels) != null) {
throw new FlumeException("Selector channel configured twice");
}
}
上述代码看完,其实就能清楚的理解配置的上的差别了,也就是说Mapping会覆盖optional的配置,为什么会覆盖呢?因为他们两个是有差别的,具体可以看看上面的processEventBatch(List<Event> events)
方法主要差别是在出异常的时候,mapping配置的Channel如果任何一个出异常会导致整个事务失败、而optional配置则不会,允许任意一个Channel发送失败。这个就是mapping和optional的主要区别。
上面主要是MultiplexingChannelSelector
,与之相关的selector是ReplicatingChannelSelector、这里的主要是optional覆盖了Mapping
ReplicatingChannelSelector.java
@Override
public void configure(Context context) {
String optionalList = context.getString(CONFIG_OPTIONAL);
//获取所有的Channels
requiredChannels = new ArrayList<Channel>(getAllChannels());
Map<String, Channel> channelNameMap = getChannelNameMap();
if(optionalList != null && !optionalList.isEmpty()) {
for(String optional : optionalList.split("\\s+")) {
// 获取optional Channel
Channel optionalChannel = channelNameMap.get(optional);
//移除对应requiredChannels中存在的Channel
requiredChannels.remove(optionalChannel);
if (!optionalChannels.contains(optionalChannel)) {
optionalChannels.add(optionalChannel);
}
}
}
}