HTTPSource

1.初始化过程

HTTPSource是众多Source的一种,如果进行配置使用HTTPSoruce,那么会启动一个基于jetty的http内嵌服务器用来接收Event。大概内容如下。

@Override
  public void start() {
    Preconditions.checkState(srv == null,
            "Running HTTP Server found in source: " + getName()
            + " before I started one."
            + "Will not attempt to start.");
    srv = new Server();

    // Connector Array
    Connector[] connectors = new Connector[1];

    //判断是否使用https
    if (sslEnabled) {
      SslSocketConnector sslSocketConnector = new HTTPSourceSocketConnector(excludedProtocols);
      sslSocketConnector.setKeystore(keyStorePath);
      sslSocketConnector.setKeyPassword(keyStorePassword);
      sslSocketConnector.setReuseAddress(true);
      connectors[0] = sslSocketConnector;
    } else {
      SelectChannelConnector connector = new SelectChannelConnector();
      connector.setReuseAddress(true);
      connectors[0] = connector;
    }

    connectors[0].setHost(host);
    connectors[0].setPort(port);
    srv.setConnectors(connectors);
    try {
      //启动jetty服务器,并设置FlumeHTTPServlet 接收http请求
      org.mortbay.jetty.servlet.Context root =
        new org.mortbay.jetty.servlet.Context(
          srv, "/", org.mortbay.jetty.servlet.Context.SESSIONS);
      root.addServlet(new ServletHolder(new FlumeHTTPServlet()), "/");
      HTTPServerConstraintUtil.enforceConstraints(root);
      //启动服务器
      srv.start();
      Preconditions.checkArgument(srv.getHandler().equals(root));
    } catch (Exception ex) {
      LOG.error("Error while starting HTTPSource. Exception follows.", ex);
      Throwables.propagate(ex);
    }
    Preconditions.checkArgument(srv.isRunning());
    sourceCounter.start();
    super.start();
  }

2.处理请求过程

HTTPSource记录请求的过程都在`FlumeHTTPServlet

private class FlumeHTTPServlet extends HttpServlet {
 @Override
    public void doPost(HttpServletRequest request, HttpServletResponse response)
            throws IOException {
       //创建一个Event空集合
      List<Event> events = Collections.emptyList(); //create empty list
      try {
      //  解析request中解析数据,目前支持两种数据格式 二进制和json
      //可以根据配置的Hanlder去解析成对象 默认采用org.apache.flume.source.http.JSONHandler
        events = handler.getEvents(request);
      } catch (HTTPBadRequestException ex) {
        LOG.warn("Received bad request from client. ", ex);
        response.sendError(HttpServletResponse.SC_BAD_REQUEST,
                "Bad request from client. "
                + ex.getMessage());
        return;
      } catch (Exception ex) {
        LOG.warn("Deserializer threw unexpected exception. ", ex);
        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                "Deserializer threw unexpected exception. "
                + ex.getMessage());
        return;
      }
      //记录接收请求的次数+1
      sourceCounter.incrementAppendBatchReceivedCount();
      //记录接收的数量
      sourceCounter.addToEventReceivedCount(events.size());
      try {
      //对event进行处理
        getChannelProcessor().processEventBatch(events);
      } catch (ChannelException ex) {
        LOG.warn("Error appending event to channel. "
                + "Channel might be full. Consider increasing the channel "
                + "capacity or make sure the sinks perform faster.", ex);
        response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                "Error appending event to channel. Channel might be full."
                + ex.getMessage());
        return;
      } catch (Exception ex) {
        LOG.warn("Unexpected error appending event to channel. ", ex);
        response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                "Unexpected error while appending event to channel. "
                + ex.getMessage());
        return;
      }
      //设置响应格式
      response.setCharacterEncoding(request.getCharacterEncoding());
      response.setStatus(HttpServletResponse.SC_OK);
      response.flushBuffer();
      // 记录实际成功的次数和event的数量
      sourceCounter.incrementAppendBatchAcceptedCount();
      sourceCounter.addToEventAcceptedCount(events.size());
    }

}

上述逻辑的核心在于getChannelProcessor().processEventBatch(events);

public void processEventBatch(List<Event> events) {
    Preconditions.checkNotNull(events, "Event list must not be null");
   //通过拦截器进行一些列的数据处理
    events = interceptorChain.intercept(events);

    Map<Channel, List<Event>> reqChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    Map<Channel, List<Event>> optChannelQueue =
        new LinkedHashMap<Channel, List<Event>>();

    for (Event event : events) {
     //通过ChannelSelector去选择相应的Channel集合、主要是通过Event 的header进行控制的
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
      //遍历所有Channel把数据放入到各个Channel中去
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch: optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }
   // 遍历reqChannelQueue集合
    // Process required channels
    for (Channel reqChannel : reqChannelQueue.keySet()) {
     //获取Channel实现的事务机制
      Transaction tx = reqChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
       //开启事务
        tx.begin();

        List<Event> batch = reqChannelQueue.get(reqChannel);
        //放入Event
        for (Event event : batch) {
          reqChannel.put(event);
        }
        //提交事务
        tx.commit();
      } catch (Throwable t) {
        //rollBack
        tx.rollback();
        if (t instanceof Error) {
          LOG.error("Error while writing to required channel: " +
              reqChannel, t);
          throw (Error) t;
        } else {
          throw new ChannelException("Unable to put batch on required " +
              "channel: " + reqChannel, t);
        }
      } finally {
        //关闭事务
        if (tx != null) {
          tx.close();
        }
      }
    }

    // Process optional channels
    for (Channel optChannel : optChannelQueue.keySet()) {
      Transaction tx = optChannel.getTransaction();
      Preconditions.checkNotNull(tx, "Transaction object must not be null");
      try {
        tx.begin();

        List<Event> batch = optChannelQueue.get(optChannel);

        for (Event event : batch ) {
          optChannel.put(event);
        }

        tx.commit();
      } catch (Throwable t) {
        tx.rollback();
        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      } finally {
        if (tx != null) {
          tx.close();
        }
      }
    }
  }

上述逻辑主要是数据Source-》Channel中、上述比较核心的是ChannelSelector中获取的两种集合ChannelreqChannelQueueoptChannelQueue, Flume支持Event从一个源到多个通道,reqChannelQueueoptChannelQueue在使用上面是有差别的。可以先看看下面的配置:

a1.source.r1.selector.mapping=c1 c2 c3
a1.source.r1.selector.optional=c3 c4

mapping 里面映射的Channel会到reqChannelQueue集合中、而optional映射的集合会到optChannelQueue,注意看两个里面都有c3,但实际上只有有一个生效,具体代码如下所示:

MultiplexingChannelSelector.java
//遍历配置的optional
for (String hdr : optionalChannelsMapping.keySet()) {
     //根据配置的name取出channel
      List<Channel> confChannels = getChannelListFromNames(
              optionalChannelsMapping.get(hdr), channelNameMap);
      //如果是空的就使用默认的Channel
      if (confChannels.isEmpty()) {
        confChannels = EMPTY_LIST;
      }
      //Remove channels from optional channels, which are already
      //configured to be required channels.
      //获取对应的reqdChannels
      List<Channel> reqdChannels = channelMapping.get(hdr);
      //Check if there are required channels, else defaults to default channels
      if(reqdChannels == null || reqdChannels.isEmpty()) {
        reqdChannels = defaultChannels;
      }
      //如果reqdChannels里面已经存在了,那么就移除该Channel
      for (Channel c : reqdChannels) {
        if (confChannels.contains(c)) {
          confChannels.remove(c);
        }
      }

      if (optionalChannels.put(hdr, confChannels) != null) {
        throw new FlumeException("Selector channel configured twice");
      }
    }

上述代码看完,其实就能清楚的理解配置的上的差别了,也就是说Mapping会覆盖optional的配置,为什么会覆盖呢?因为他们两个是有差别的,具体可以看看上面的processEventBatch(List<Event> events)方法主要差别是在出异常的时候,mapping配置的Channel如果任何一个出异常会导致整个事务失败、而optional配置则不会,允许任意一个Channel发送失败。这个就是mapping和optional的主要区别。

上面主要是MultiplexingChannelSelector,与之相关的selector是ReplicatingChannelSelector、这里的主要是optional覆盖了Mapping

ReplicatingChannelSelector.java
 @Override
  public void configure(Context context) {

    String optionalList = context.getString(CONFIG_OPTIONAL);
    //获取所有的Channels
    requiredChannels = new ArrayList<Channel>(getAllChannels());
    Map<String, Channel> channelNameMap = getChannelNameMap();
    if(optionalList != null && !optionalList.isEmpty()) {
      for(String optional : optionalList.split("\\s+")) {
      // 获取optional Channel
        Channel optionalChannel = channelNameMap.get(optional);
        //移除对应requiredChannels中存在的Channel
        requiredChannels.remove(optionalChannel);
        if (!optionalChannels.contains(optionalChannel)) {
          optionalChannels.add(optionalChannel);
        }
      }
    }
  }

results matching ""

    No results matching ""