1.master和slave的同步机制
1.1 write double 模式
双写模式,写入master的时候同时写入slave,这个模式,如下面代码所示,如果当前是SYNC_MASTER模式,就通过HAService写入
//如果配置是sync master 就强制推给slave
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (msg.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
+ msg.getTags() + " client address: " + msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
1.2 异步复制机制
异步复制分为两个部分,消费信息和commitlog的复制
消费信息
消费信息的复制在BrokerController中initialize()
的时候启动一个定时任务,间隔60s进行一次同步,也就是说,一旦master挂掉之后,产生重复信息的几率就会很大。不过发生几率比较小,大概同步四个数据Topic 、ConsumerOffset、DelayOffset、SubscriptionGroupConfig,具体复制逻辑在SlaveSynchronize中,但是基本没有太多逻辑,无非是简单的数据复制而已
//判断是否是slave节点,如果是设置HAMaster地址
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
//同步master的四个属性
//this.syncTopicConfig();
//this.syncConsumerOffset();
//this.syncDelayOffset();
//this.syncSubscriptionGroupConfig();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {
//如果不是master就打印slave和master之间的差别量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
1.2
master和slave启动的时候都会默认启动HaService,HaService大概启动如下几个服务
public void start() {
//注册niosocket 和selector
this.acceptSocketService.beginAccept();
//启动niosocket
this.acceptSocketService.start();
//启动groupTransferService
this.groupTransferService.start();
//启动haClient
this.haClient.start();
}