JdbcChannel源码分析

JdbcChannel 对Mysql做了一个代理,其核心主要是在JdbcChannelProvider中.

public class JdbcChannel extends AbstractChannel {

  private static final Logger LOG = LoggerFactory.getLogger(JdbcChannel.class);

  private JdbcChannelProvider provider;

  /**
   * Instantiates a new JDBC Channel.
   */
  public JdbcChannel() {
  }
 //保存数据
  @Override
  public void put(Event event) throws ChannelException {
    getProvider().persistEvent(getName(), event);
  }
 //删除数据
  @Override
  public Event take() throws ChannelException {
    return getProvider().removeEvent(getName());
  }
 //获取代理事务
  @Override
  public Transaction getTransaction() {
    return getProvider().getTransaction();
  }
 //关闭
  @Override
  public void stop() {
    JdbcChannelProviderFactory.releaseProvider(getName());
    provider = null;

    super.stop();
  }

  private JdbcChannelProvider getProvider() {
    return provider;
  }

  @Override
  public void configure(Context context) {
    provider = JdbcChannelProviderFactory.getProvider(context, getName());

    LOG.info("JDBC Channel initialized: " + getName());
  }
}

JdbcChannelProviderImpl初始化流程

  @Override
  public void initialize(Context context) {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Initializing JDBC Channel provider with props: "
          + context);
    }
    //初始化配置的properties
    initializeSystemProperties(context);
    //初始化DataSource
    initializeDataSource(context);
    //初始化数据库
    initializeSchema(context);
    //初始化Channel状态
    initializeChannelState(context);
  }

初始化DataSource

```javaprivate void initializeDataSource(Context context) { //通过配置获取driverClassName driverClassName = getConfigurationString(context, ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS, ConfigurationConstants.OLD_CONFIG_JDBC_DRIVER_CLASS, null); //获取链接的url connectUrl = getConfigurationString(context, ConfigurationConstants.CONFIG_URL, ConfigurationConstants.OLD_CONFIG_URL, null);

//获取用户名 String userName = getConfigurationString(context, ConfigurationConstants.CONFIG_USERNAME, ConfigurationConstants.OLD_CONFIG_USERNAME, null); //获取密码 String password = getConfigurationString(context, ConfigurationConstants.CONFIG_PASSWORD, ConfigurationConstants.OLD_CONFIG_PASSWORD, null); //获取jdbc的propertiesFile String jdbcPropertiesFile = getConfigurationString(context, ConfigurationConstants.CONFIG_JDBC_PROPS_FILE, ConfigurationConstants.OLD_CONFIG_JDBC_PROPS_FILE, null); //获取数据库的类型 String dbTypeName = getConfigurationString(context, ConfigurationConstants.CONFIG_DATABASE_TYPE, ConfigurationConstants.OLD_CONFIG_DATABASE_TYPE, null); //如果connect url为空 那么就是用Derby进行数据库操作、 // If connect URL is not specified, use embedded Derby if (connectUrl == null || connectUrl.trim().length() == 0) { LOGGER.warn("No connection URL specified. "

      + "Using embedded derby database instance.");

  driverClassName = DEFAULT_DRIVER_CLASSNAME;
  userName = DEFAULT_USERNAME;
  password = DEFAULT_PASSWORD;
  dbTypeName = DEFAULT_DBTYPE;

  String homePath = System.getProperty("user.home").replace('\\', '/');
//数据存放在user.home的/.flume/jdbc-channel下面
  String defaultDbDir = homePath + "/.flume/jdbc-channel";


  File dbDir = new File(defaultDbDir);
  String canonicalDbDirPath = null;

  try {
    canonicalDbDirPath = dbDir.getCanonicalPath();
  } catch (IOException ex) {
    throw new JdbcChannelException("Unable to find canonical path of dir: "
        + defaultDbDir, ex);
  }

  if (!dbDir.exists()) {
    if (!dbDir.mkdirs()) {
      throw new JdbcChannelException("unable to create directory: "
          + canonicalDbDirPath);
    }
  }
//拼接链接url
  connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";

  // No jdbc properties file will be used
  jdbcPropertiesFile = null;

  LOGGER.warn("Overriding values for - driver: " + driverClassName
      + ", user: " + userName + "connectUrl: " + connectUrl
      + ", jdbc properties file: " + jdbcPropertiesFile
      + ", dbtype: " + dbTypeName);
}

// Right now only Derby and MySQL supported
databaseType = DatabaseType.getByName(dbTypeName);

//目前只支持DERBY和mysql switch (databaseType) { case DERBY: case MYSQL: break; default: throw new JdbcChannelException("Database " + databaseType

      + " not supported at this time");
}

// Register driver
if (driverClassName == null || driverClassName.trim().length() == 0) {
  throw new JdbcChannelException("No jdbc driver specified");
}

try {
  Class.forName(driverClassName);
} catch (ClassNotFoundException ex) {
  throw new JdbcChannelException("Unable to load driver: "
              + driverClassName, ex);
}

// JDBC Properties
Properties jdbcProps = new Properties();

if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
  File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
  if (!jdbcPropsFile.exists()) {
    throw new JdbcChannelException("Jdbc properties file does not exist: "
        + jdbcPropertiesFile);
  }

  InputStream inStream = null;
  try {
    inStream = new FileInputStream(jdbcPropsFile);
    jdbcProps.load(inStream);
  } catch (IOException ex) {
    throw new JdbcChannelException("Unable to load jdbc properties "
        + "from file: " + jdbcPropertiesFile, ex);
  } finally {
    if (inStream != null) {
      try {
        inStream.close();
      } catch (IOException ex) {
        LOGGER.error("Unable to close file: " + jdbcPropertiesFile, ex);
      }
    }
  }
}

if (userName != null) {
  Object oldUser = jdbcProps.put("user", userName);
  if (oldUser != null) {
    LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
  }
}

if (password != null) {
  Object oldPass = jdbcProps.put("password", password);
  if (oldPass != null) {
    LOGGER.warn("Overriding password from the jdbc properties with "
        + " the one specified explicitly.");
  }
}

if (LOGGER.isDebugEnabled()) {
  StringBuilder sb = new StringBuilder("JDBC Properties {");
  boolean first = true;
  Enumeration<?> propertyKeys = jdbcProps.propertyNames();
  while (propertyKeys.hasMoreElements()) {
    if (first) {
      first = false;
    } else {
      sb.append(", ");
    }
    String key = (String) propertyKeys.nextElement();
    sb.append(key).append("=");
    if (key.equalsIgnoreCase("password")) {
      sb.append("*******");
    } else {
      sb.append(jdbcProps.get(key));
    }
  }

  sb.append("}");

  LOGGER.debug(sb.toString());
}
//下面主要是配置数据库DBCP连接池这里就不做详细的讲解了
// Transaction Isolation
//获取事务的隔离级别,默认是read_committed
String txIsolation = getConfigurationString(context,
    ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
    ConfigurationConstants.OLD_CONFIG_TX_ISOLATION_LEVEL,
    TransactionIsolation.READ_COMMITTED.getName());

TransactionIsolation txIsolationLevel =
    TransactionIsolation.getByName(txIsolation);

LOGGER.debug("Transaction isolation will be set to: " + txIsolationLevel);

// Setup Datasource
ConnectionFactory connFactory =
    new DriverManagerConnectionFactory(connectUrl, jdbcProps);

connectionPool = new GenericObjectPool();

String maxActiveConnections = getConfigurationString(context,
    ConfigurationConstants.CONFIG_MAX_CONNECTIONS,
    ConfigurationConstants.OLD_CONFIG_MAX_CONNECTIONS, "10");

int maxActive = 10;
if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
  try {
    maxActive = Integer.parseInt(maxActiveConnections);
  } catch (NumberFormatException nfe) {
    LOGGER.warn("Max active connections has invalid value: "
            + maxActiveConnections + ", Using default: " + maxActive);
  }
}

LOGGER.debug("Max active connections for the pool: " + maxActive);
connectionPool.setMaxActive(maxActive);

statementPool = new GenericKeyedObjectPoolFactory(null);

// Creating the factory instance automatically wires the connection pool
new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
    databaseType.getValidationQuery(), false, false,
    txIsolationLevel.getCode());

dataSource = new PoolingDataSource(connectionPool);

txFactory = new JdbcTransactionFactory(dataSource, this);
###初始化Schema
初始化Schema会创建如下表:

Event表

```java

 * <pre>
 * +-------------------------------+
 * | FL_EVENT                      |
 * +-------------------------------+
 * | FLE_ID     : BIGINT PK        | (auto-gen sequence)
 * | FLE_PAYLOAD: VARBINARY(16384) | (16kb payload)
 * | FLE_SPILL  : BOOLEAN          | (true if payload spills)
 * | FLE_CHANNEL: VARCHAR(64)      |
 * +-------------------------------+
 * </pre>

Event溢出以后的表

 * <pre>
 * +---------------------+
 * | FL_PLSPILL          |
 * +---------------------+
 * | FLP_EVENT  : BIGINT | (FK into FL_EVENT.FLE_ID)
 * | FLP_SPILL  : BLOB   |
 * +---------------------+
 * </pre>

Event头部表

 * <pre>
 * +--------------------------+
 * | FL_HEADER                |
 * +--------------------------+
 * | FLH_ID     : BIGINT PK   | (auto-gen sequence)
 * | FLH_EVENT  : BIGINT      | (FK into FL_EVENT.FLE_ID)
 * | FLH_NAME   : VARCHAR(251)|
 * | FLH_VALUE  : VARCHAR(251)|
 * | FLH_NMSPILL: BOOLEAN     | (true if name spills)
 * | FLH_VLSPILL: BOOLEAN     | (true if value spills)
 * +--------------------------+
 * </pre>

Event头部name益处了之后的数据

 * <pre>
 * +----------------------+
 * | FL_NMSPILL           |
 * +----------------------+
 * | FLN_HEADER  : BIGINT | (FK into FL_HEADER.FLH_ID)
 * | FLN_SPILL   : CLOB   |
 * +----------------------+
 * </pre>

Event头部value益处了之后的数据

 * <pre>
 * +----------------------+
 * | FL_VLSPILL           |
 * +----------------------+
 * | FLV_HEADER  : BIGINT | (FK into FL_HEADER.FLH_ID)
 * | FLV_SPILL   : CLOB   |
 * +----------------------+
 * </pre>

按照上述的表建立来说,数据库存储Event的payload数据的时候,限制了大小16kb、如果溢出,那么会使用FL_PLSPILL表存储数据。但是注意,FLume并没有为Mysql建表,MySQLSchemaHandler没有进行任何实现。

总结

这里主要讲述了初始化流程,其实总的来说jdbc这一块没有太多逻辑,主要做了这么几件事情,建表、存储数据、代理事务,Channel在存储数据的时候是使用事务机制的,本身jdbc就自带了事务,所以也很简单

results matching ""

    No results matching ""