1.CommitLog存储格式
CommitLog中的消息存储格式如下:
4字节(TOTALSIZE)+4字节(MAGICCODE)+4字节(BODYCRC)
+4字节(QUEUEID)+4字节(FLAG)+8字节(QUEUEOFFSET)+8字节(PHYSICALOFFSET)
+4字节(SYSFLAG)+8字节(BORNTIMESTAMP)+8字节(BORNHOST)+8字节(STORETIMESTAMP)+8字节(STOREHOSTADDRESS)
+4字节(RECONSUMETIMES)+8字节(Prepared Transaction Offset)+
(4字节(int bodyLength)+bodyLength)+(1字节((byte) topicLength)+topicLength)
+(2字节(short propertiesLength)+propertiesLength)
2.CommitLog实现机制
3.CommitLog刷盘机制
异步刷盘
CommitLog中的MapedFile设计了一个页缓存,当前write/OS_PAGE_SIZE - flush / OS_PAGE_SIZE得到当前增量页,如果大于设置的页就进行刷盘。这流程在是每次接受到request的时候,nofity一个正在等待的线程执行 ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages。
使用的FlushRealTimeService进行刷盘,这个线程使用两种策略进行等待,interval可以进行配置,默认是1秒进行一次检查,用来保证页面Cache即时的进行flush,可以设置两种策略,另外一种是触法一种是sleep形式
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
this.waitForRunning(interval);
}
两种策略的区别是sleep是时间间隔的,也就是说不能唤醒,实时性比较差,而采用wait在插入的时候是实时唤醒的。
同步刷盘
同步刷盘使用的是GroupCommitService,同步刷盘策略流程跟异步刷盘的策略差不多,不过同步刷盘不采用页面Cache的形式,当有一条数据就行进行刷盘,这是一种比较低效的方式,但是能够很好的保证数据,这块设置了一个超时策略,一旦刷盘超时会返回错误信息。 同步刷盘也是一个单独的工作线程,基本是10毫秒进行一次刷盘检查,保障数据持久化到硬盘
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
if (diff < 10000000 //
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
return true;
}
return false;
}
3.commitLog 清除过期文件机制
DefaultMessageStore中启动的时候会默认启动一个定时任务,默认频率是10000ms,调用cleanFilesPeriodically方法进行清除,该方法会调用cleanCommitLogService的run()方法,具体代码如下:
private void deleteExpiredFiles() {
int deleteCount = 0;
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
//三种情况,1.判断是否到时间了 2.判断磁盘是否已经满 了 3.manualDeleteFileSeveralTimes>0
boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
//是否是立即删除
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
fileReservedTime, //
timeup, //
spacefull, //
manualDeleteFileSeveralTimes, //
cleanAtOnce);
//过期时间默认是72小时,如果一个文件commitLog的数据文件在72小时内没有被修改过 那么就认为该文件已经过期了
fileReservedTime *= 60 * 60 * 1000;
//删除commitLog里面过期的文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
cleanAtOnce 这块是一个比较吭的地方,如果硬盘被占满75%以上,那么只会保留一个数据文件,系统认为一旦你的数据文件过多产生堆积之后会导致磁盘满了,当然也可以通过配置cleanFileForciblyEnable=false
进行优化,即时硬盘占用满了,也不会删除,除非满足另外一个条件,超过72小时文件没有被修改过
4 commitLog 插入消息机制
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
//做crc校验,保证取出来的body数据 是正确的
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
//获取消息topic
String topic = msg.getTopic();
int queueId = msg.getQueueId();
//获取消息sysflag
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TransactionNotType//如果是事物
|| tranType == MessageSysFlag.TransactionCommitType) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MapedFile unlockMapedFile = null;
//获取具体写的commitLogfile 文件默认大小是一个G
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
//整体锁住
synchronized (this) {
//获取当前时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
//如果mapedFile==null 或者文件已经写满了
if (null == mapedFile || mapedFile.isFull()) {
//从新获取一个mappfile 这里会创建两个File 是当前使用的File和nextFile
mapedFile = this.mapedFileQueue.getLastMapedFile();
}
//如果还是==null 就抛出异常吧
if (null == mapedFile) {
log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//把消息添加到末尾
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
//如果已经写到了末尾,从新拿到一个文件 继续写
case END_OF_FILE:
unlockMapedFile = mapedFile;
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
if (null == mapedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} // end of synchronized
if (eclipseTimeInLock > 1000) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMapedFile) {
this.defaultMessageStore.unlockMapedFile(unlockMapedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
//统计
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
GroupCommitRequest request = null;
// Synchronization 判断type flush 如果是同步flush 那么就直接flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (msg.isWaitStoreMsgOK()) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
+ " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
else {
this.flushCommitLogService.wakeup();
}
// Synchronous write double
//如果配置是sync master 就强制推给slave
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (msg.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return putMessageResult;
}