JdbcChannel源码分析
JdbcChannel 对Mysql做了一个代理,其核心主要是在JdbcChannelProvider中.
public class JdbcChannel extends AbstractChannel {
private static final Logger LOG = LoggerFactory.getLogger(JdbcChannel.class);
private JdbcChannelProvider provider;
/**
* Instantiates a new JDBC Channel.
*/
public JdbcChannel() {
}
//保存数据
@Override
public void put(Event event) throws ChannelException {
getProvider().persistEvent(getName(), event);
}
//删除数据
@Override
public Event take() throws ChannelException {
return getProvider().removeEvent(getName());
}
//获取代理事务
@Override
public Transaction getTransaction() {
return getProvider().getTransaction();
}
//关闭
@Override
public void stop() {
JdbcChannelProviderFactory.releaseProvider(getName());
provider = null;
super.stop();
}
private JdbcChannelProvider getProvider() {
return provider;
}
@Override
public void configure(Context context) {
provider = JdbcChannelProviderFactory.getProvider(context, getName());
LOG.info("JDBC Channel initialized: " + getName());
}
}
JdbcChannelProviderImpl初始化流程
@Override
public void initialize(Context context) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Initializing JDBC Channel provider with props: "
+ context);
}
//初始化配置的properties
initializeSystemProperties(context);
//初始化DataSource
initializeDataSource(context);
//初始化数据库
initializeSchema(context);
//初始化Channel状态
initializeChannelState(context);
}
初始化DataSource
```javaprivate void initializeDataSource(Context context) { //通过配置获取driverClassName driverClassName = getConfigurationString(context, ConfigurationConstants.CONFIG_JDBC_DRIVER_CLASS, ConfigurationConstants.OLD_CONFIG_JDBC_DRIVER_CLASS, null); //获取链接的url connectUrl = getConfigurationString(context, ConfigurationConstants.CONFIG_URL, ConfigurationConstants.OLD_CONFIG_URL, null);
//获取用户名 String userName = getConfigurationString(context, ConfigurationConstants.CONFIG_USERNAME, ConfigurationConstants.OLD_CONFIG_USERNAME, null); //获取密码 String password = getConfigurationString(context, ConfigurationConstants.CONFIG_PASSWORD, ConfigurationConstants.OLD_CONFIG_PASSWORD, null); //获取jdbc的propertiesFile String jdbcPropertiesFile = getConfigurationString(context, ConfigurationConstants.CONFIG_JDBC_PROPS_FILE, ConfigurationConstants.OLD_CONFIG_JDBC_PROPS_FILE, null); //获取数据库的类型 String dbTypeName = getConfigurationString(context, ConfigurationConstants.CONFIG_DATABASE_TYPE, ConfigurationConstants.OLD_CONFIG_DATABASE_TYPE, null); //如果connect url为空 那么就是用Derby进行数据库操作、 // If connect URL is not specified, use embedded Derby if (connectUrl == null || connectUrl.trim().length() == 0) { LOGGER.warn("No connection URL specified. "
+ "Using embedded derby database instance.");
driverClassName = DEFAULT_DRIVER_CLASSNAME;
userName = DEFAULT_USERNAME;
password = DEFAULT_PASSWORD;
dbTypeName = DEFAULT_DBTYPE;
String homePath = System.getProperty("user.home").replace('\\', '/');
//数据存放在user.home的/.flume/jdbc-channel下面
String defaultDbDir = homePath + "/.flume/jdbc-channel";
File dbDir = new File(defaultDbDir);
String canonicalDbDirPath = null;
try {
canonicalDbDirPath = dbDir.getCanonicalPath();
} catch (IOException ex) {
throw new JdbcChannelException("Unable to find canonical path of dir: "
+ defaultDbDir, ex);
}
if (!dbDir.exists()) {
if (!dbDir.mkdirs()) {
throw new JdbcChannelException("unable to create directory: "
+ canonicalDbDirPath);
}
}
//拼接链接url
connectUrl = "jdbc:derby:" + canonicalDbDirPath + "/db;create=true";
// No jdbc properties file will be used
jdbcPropertiesFile = null;
LOGGER.warn("Overriding values for - driver: " + driverClassName
+ ", user: " + userName + "connectUrl: " + connectUrl
+ ", jdbc properties file: " + jdbcPropertiesFile
+ ", dbtype: " + dbTypeName);
}
// Right now only Derby and MySQL supported
databaseType = DatabaseType.getByName(dbTypeName);
//目前只支持DERBY和mysql switch (databaseType) { case DERBY: case MYSQL: break; default: throw new JdbcChannelException("Database " + databaseType
+ " not supported at this time");
}
// Register driver
if (driverClassName == null || driverClassName.trim().length() == 0) {
throw new JdbcChannelException("No jdbc driver specified");
}
try {
Class.forName(driverClassName);
} catch (ClassNotFoundException ex) {
throw new JdbcChannelException("Unable to load driver: "
+ driverClassName, ex);
}
// JDBC Properties
Properties jdbcProps = new Properties();
if (jdbcPropertiesFile != null && jdbcPropertiesFile.trim().length() > 0) {
File jdbcPropsFile = new File(jdbcPropertiesFile.trim());
if (!jdbcPropsFile.exists()) {
throw new JdbcChannelException("Jdbc properties file does not exist: "
+ jdbcPropertiesFile);
}
InputStream inStream = null;
try {
inStream = new FileInputStream(jdbcPropsFile);
jdbcProps.load(inStream);
} catch (IOException ex) {
throw new JdbcChannelException("Unable to load jdbc properties "
+ "from file: " + jdbcPropertiesFile, ex);
} finally {
if (inStream != null) {
try {
inStream.close();
} catch (IOException ex) {
LOGGER.error("Unable to close file: " + jdbcPropertiesFile, ex);
}
}
}
}
if (userName != null) {
Object oldUser = jdbcProps.put("user", userName);
if (oldUser != null) {
LOGGER.warn("Overriding user from: " + oldUser + " to: " + userName);
}
}
if (password != null) {
Object oldPass = jdbcProps.put("password", password);
if (oldPass != null) {
LOGGER.warn("Overriding password from the jdbc properties with "
+ " the one specified explicitly.");
}
}
if (LOGGER.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("JDBC Properties {");
boolean first = true;
Enumeration<?> propertyKeys = jdbcProps.propertyNames();
while (propertyKeys.hasMoreElements()) {
if (first) {
first = false;
} else {
sb.append(", ");
}
String key = (String) propertyKeys.nextElement();
sb.append(key).append("=");
if (key.equalsIgnoreCase("password")) {
sb.append("*******");
} else {
sb.append(jdbcProps.get(key));
}
}
sb.append("}");
LOGGER.debug(sb.toString());
}
//下面主要是配置数据库DBCP连接池这里就不做详细的讲解了
// Transaction Isolation
//获取事务的隔离级别,默认是read_committed
String txIsolation = getConfigurationString(context,
ConfigurationConstants.CONFIG_TX_ISOLATION_LEVEL,
ConfigurationConstants.OLD_CONFIG_TX_ISOLATION_LEVEL,
TransactionIsolation.READ_COMMITTED.getName());
TransactionIsolation txIsolationLevel =
TransactionIsolation.getByName(txIsolation);
LOGGER.debug("Transaction isolation will be set to: " + txIsolationLevel);
// Setup Datasource
ConnectionFactory connFactory =
new DriverManagerConnectionFactory(connectUrl, jdbcProps);
connectionPool = new GenericObjectPool();
String maxActiveConnections = getConfigurationString(context,
ConfigurationConstants.CONFIG_MAX_CONNECTIONS,
ConfigurationConstants.OLD_CONFIG_MAX_CONNECTIONS, "10");
int maxActive = 10;
if (maxActiveConnections != null && maxActiveConnections.length() > 0) {
try {
maxActive = Integer.parseInt(maxActiveConnections);
} catch (NumberFormatException nfe) {
LOGGER.warn("Max active connections has invalid value: "
+ maxActiveConnections + ", Using default: " + maxActive);
}
}
LOGGER.debug("Max active connections for the pool: " + maxActive);
connectionPool.setMaxActive(maxActive);
statementPool = new GenericKeyedObjectPoolFactory(null);
// Creating the factory instance automatically wires the connection pool
new PoolableConnectionFactory(connFactory, connectionPool, statementPool,
databaseType.getValidationQuery(), false, false,
txIsolationLevel.getCode());
dataSource = new PoolingDataSource(connectionPool);
txFactory = new JdbcTransactionFactory(dataSource, this);
###初始化Schema
初始化Schema会创建如下表:
Event表
```java
* <pre>
* +-------------------------------+
* | FL_EVENT |
* +-------------------------------+
* | FLE_ID : BIGINT PK | (auto-gen sequence)
* | FLE_PAYLOAD: VARBINARY(16384) | (16kb payload)
* | FLE_SPILL : BOOLEAN | (true if payload spills)
* | FLE_CHANNEL: VARCHAR(64) |
* +-------------------------------+
* </pre>
Event溢出以后的表
* <pre>
* +---------------------+
* | FL_PLSPILL |
* +---------------------+
* | FLP_EVENT : BIGINT | (FK into FL_EVENT.FLE_ID)
* | FLP_SPILL : BLOB |
* +---------------------+
* </pre>
Event头部表
* <pre>
* +--------------------------+
* | FL_HEADER |
* +--------------------------+
* | FLH_ID : BIGINT PK | (auto-gen sequence)
* | FLH_EVENT : BIGINT | (FK into FL_EVENT.FLE_ID)
* | FLH_NAME : VARCHAR(251)|
* | FLH_VALUE : VARCHAR(251)|
* | FLH_NMSPILL: BOOLEAN | (true if name spills)
* | FLH_VLSPILL: BOOLEAN | (true if value spills)
* +--------------------------+
* </pre>
Event头部name益处了之后的数据
* <pre>
* +----------------------+
* | FL_NMSPILL |
* +----------------------+
* | FLN_HEADER : BIGINT | (FK into FL_HEADER.FLH_ID)
* | FLN_SPILL : CLOB |
* +----------------------+
* </pre>
Event头部value益处了之后的数据
* <pre>
* +----------------------+
* | FL_VLSPILL |
* +----------------------+
* | FLV_HEADER : BIGINT | (FK into FL_HEADER.FLH_ID)
* | FLV_SPILL : CLOB |
* +----------------------+
* </pre>
按照上述的表建立来说,数据库存储Event的payload数据的时候,限制了大小16kb、如果溢出,那么会使用FL_PLSPILL表存储数据。但是注意,FLume并没有为Mysql建表,MySQLSchemaHandler没有进行任何实现。
总结
这里主要讲述了初始化流程,其实总的来说jdbc这一块没有太多逻辑,主要做了这么几件事情,建表、存储数据、代理事务,Channel在存储数据的时候是使用事务机制的,本身jdbc就自带了事务,所以也很简单