1.CommitLog存储格式

CommitLog中的消息存储格式如下:

4字节(TOTALSIZE)+4字节(MAGICCODE)+4字节(BODYCRC)
+4字节(QUEUEID)+4字节(FLAG)+8字节(QUEUEOFFSET)+8字节(PHYSICALOFFSET)
+4字节(SYSFLAG)+8字节(BORNTIMESTAMP)+8字节(BORNHOST)+8字节(STORETIMESTAMP)+8字节(STOREHOSTADDRESS)
+4字节(RECONSUMETIMES)+8字节(Prepared Transaction Offset)+
(4字节(int bodyLength)+bodyLength)+(1字节((byte) topicLength)+topicLength)
+(2字节(short propertiesLength)+propertiesLength)

2.CommitLog实现机制

3.CommitLog刷盘机制

异步刷盘

CommitLog中的MapedFile设计了一个页缓存,当前write/OS_PAGE_SIZE - flush / OS_PAGE_SIZE得到当前增量页,如果大于设置的页就进行刷盘。这流程在是每次接受到request的时候,nofity一个正在等待的线程执行 ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages。

使用的FlushRealTimeService进行刷盘,这个线程使用两种策略进行等待,interval可以进行配置,默认是1秒进行一次检查,用来保证页面Cache即时的进行flush,可以设置两种策略,另外一种是触法一种是sleep形式

    if (flushCommitLogTimed) {
        Thread.sleep(interval);
    } else {
        this.waitForRunning(interval);
    }

两种策略的区别是sleep是时间间隔的,也就是说不能唤醒,实时性比较差,而采用wait在插入的时候是实时唤醒的。

同步刷盘

同步刷盘使用的是GroupCommitService,同步刷盘策略流程跟异步刷盘的策略差不多,不过同步刷盘不采用页面Cache的形式,当有一条数据就行进行刷盘,这是一种比较低效的方式,但是能够很好的保证数据,这块设置了一个超时策略,一旦刷盘超时会返回错误信息。 同步刷盘也是一个单独的工作线程,基本是10毫秒进行一次刷盘检查,保障数据持久化到硬盘

try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }
public boolean isOSPageCacheBusy() {
    long begin = this.getCommitLog().getBeginTimeInLock();
    long diff = this.systemClock.now() - begin;

    if (diff < 10000000 //
            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
        return true;
    }

    return false;
}

3.commitLog 清除过期文件机制

DefaultMessageStore中启动的时候会默认启动一个定时任务,默认频率是10000ms,调用cleanFilesPeriodically方法进行清除,该方法会调用cleanCommitLogService的run()方法,具体代码如下:


    private void deleteExpiredFiles() {
            int deleteCount = 0;
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            //三种情况,1.判断是否到时间了 2.判断磁盘是否已经满 了 3.manualDeleteFileSeveralTimes>0
            boolean timeup = this.isTimeToDelete();

            boolean spacefull = this.isSpaceToDelete();

            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;


            if (timeup || spacefull || manualDelete) {

                if (manualDelete)
                    this.manualDeleteFileSeveralTimes--;

                //是否是立即删除
                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
                        fileReservedTime, //
                        timeup, //
                        spacefull, //
                        manualDeleteFileSeveralTimes, //
                        cleanAtOnce);

                //过期时间默认是72小时,如果一个文件commitLog的数据文件在72小时内没有被修改过 那么就认为该文件已经过期了
                fileReservedTime *= 60 * 60 * 1000;
                //删除commitLog里面过期的文件
                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                        destroyMapedFileIntervalForcibly, cleanAtOnce);
                if (deleteCount > 0) {
                } else if (spacefull) {
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }

cleanAtOnce 这块是一个比较吭的地方,如果硬盘被占满75%以上,那么只会保留一个数据文件,系统认为一旦你的数据文件过多产生堆积之后会导致磁盘满了,当然也可以通过配置cleanFileForciblyEnable=false进行优化,即时硬盘占用满了,也不会删除,除非满足另外一个条件,超过72小时文件没有被修改过

4 commitLog 插入消息机制


    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        //做crc校验,保证取出来的body数据 是正确的
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        //获取消息topic
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
        //获取消息sysflag
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TransactionNotType//如果是事物
                || tranType == MessageSysFlag.TransactionCommitType) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MapedFile unlockMapedFile = null;
        //获取具体写的commitLogfile  文件默认大小是一个G
        MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
        //整体锁住
        synchronized (this) {
            //获取当前时间
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
            //如果mapedFile==null 或者文件已经写满了
            if (null == mapedFile || mapedFile.isFull()) {
                //从新获取一个mappfile 这里会创建两个File 是当前使用的File和nextFile
                mapedFile = this.mapedFileQueue.getLastMapedFile();
            }
            //如果还是==null 就抛出异常吧
            if (null == mapedFile) {
                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            //把消息添加到末尾
            result = mapedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                //如果已经写到了末尾,从新拿到一个文件 继续写
                case END_OF_FILE:
                    unlockMapedFile = mapedFile;
                    // Create a new file, re-write the message
                    mapedFile = this.mapedFileQueue.getLastMapedFile();
                    if (null == mapedFile) {
                        // XXX: warn and notify me
                        log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } // end of synchronized

        if (eclipseTimeInLock > 1000) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }
        if (null != unlockMapedFile) {
            this.defaultMessageStore.unlockMapedFile(unlockMapedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        //统计
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        GroupCommitRequest request = null;

        // Synchronization 判断type  flush  如果是同步flush 那么就直接flush
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (msg.isWaitStoreMsgOK()) {
                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
                            + " client address: " + msg.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        else {
            this.flushCommitLogService.wakeup();
        }

        // Synchronous write double
        //如果配置是sync master   就强制推给slave
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (msg.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    if (null == request) {
                        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    }
                    service.putRequest(request);

                    service.getWaitNotifyObject().wakeupAll();

                    boolean flushOK =
                            // TODO
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
                                + msg.getTags() + " client address: " + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

        return putMessageResult;
    }

results matching ""

    No results matching ""