1.master和slave的同步机制

1.1 write double 模式

双写模式,写入master的时候同时写入slave,这个模式,如下面代码所示,如果当前是SYNC_MASTER模式,就通过HAService写入

//如果配置是sync master   就强制推给slave
        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
            HAService service = this.defaultMessageStore.getHaService();
            if (msg.isWaitStoreMsgOK()) {
                // Determine whether to wait
                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                    if (null == request) {
                        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                    }
                    service.putRequest(request);

                    service.getWaitNotifyObject().wakeupAll();

                    boolean flushOK =
                            // TODO
                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    if (!flushOK) {
                        log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
                                + msg.getTags() + " client address: " + msg.getBornHostString());
                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                    }
                }
                // Slave problem
                else {
                    // Tell the producer, slave not available
                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                }
            }
        }

1.2 异步复制机制

异步复制分为两个部分,消费信息和commitlog的复制

消费信息

消费信息的复制在BrokerController中initialize()的时候启动一个定时任务,间隔60s进行一次同步,也就是说,一旦master挂掉之后,产生重复信息的几率就会很大。不过发生几率比较小,大概同步四个数据Topic 、ConsumerOffset、DelayOffset、SubscriptionGroupConfig,具体复制逻辑在SlaveSynchronize中,但是基本没有太多逻辑,无非是简单的数据复制而已

//判断是否是slave节点,如果是设置HAMaster地址
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
                //同步master的四个属性
                //this.syncTopicConfig();
                //this.syncConsumerOffset();
                //this.syncDelayOffset();
                //this.syncSubscriptionGroupConfig();

                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        } catch (Throwable e) {
                            log.error("ScheduledTask syncAll slave exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            } else {
                //如果不是master就打印slave和master之间的差别量
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }

1.2

master和slave启动的时候都会默认启动HaService,HaService大概启动如下几个服务

public void start() {
        //注册niosocket 和selector
        this.acceptSocketService.beginAccept();
        //启动niosocket
        this.acceptSocketService.start();
        //启动groupTransferService
        this.groupTransferService.start();
        //启动haClient
        this.haClient.start();
    }

commitLog数据复制

results matching ""

    No results matching ""