RocketMQ源码分析一之消费者发送信息
//异步信息信息发送
private SendResult sendDefaultImpl(//
Message msg, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final long timeout//
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
//校验消息长度
Validators.checkMessage(msg, this.defaultMQProducer);
//获取invodeID
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//从nameServer上面拉去具体Topic对应的BrokerAddr和对应的 消息队列信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
//如果拉去到了就进行消息发送
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//如果当前是异步模式 就获取重试次数,如果是同步就采用一次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
//发送信息和重试逻辑
for (; times < timesTotal; times++) {
//获取brockerName
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//这里采用了负载均衡策略,每次轮训queue 发送信息,达到负载
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (tmpmq != null) {
mq = tmpmq;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//发送消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
} // end of for
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
times, //
(System.currentTimeMillis() - beginTimestampFirst), //
msg.getTopic(), //
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.ConnectBrokerException);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.AccessBrokerTimeout);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BrokerNotExistException);
}
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NoNameServerException);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NotFoundTopicException);
}
上述代码比较多,但是逻辑相对比较简单,首先得理解RocketMQ的几个名词NameServer、Brocker。然后主要是几个函数的流程,tryToFindTopicPublishInfo
、selectOneMessageQueue
、sendKernelImpl
tryToFindTopicPublishInfo流程
一般来说NameServer相当于zookeeper,做服务发现用,大致流程如下:
- 从本地缓存获取brocker,如果存在就返回,如果不存在继续下面的步骤
- 从NameServer拉去brocker的信息和本地的进行对比,如果有改变或者本地不存在就进行更新
selectOneMessageQueue 流程
在RocketMQ中一个Topic对应多个队列,通过多个队列的形式来达到高并发的效果,多个队列之间会采用轮训的方式使用,消息每次都发送到不同的消息队列中,index++%queueSize。Producer通过从NameServer中获取到brocker的Queue的Id,能够知道具体发送到Brocker中的具体Queue,大概对应关系如下:
重试机制
异步发送消息的时候可以采用producer.setRetryTimesWhenSendFailed()
进行设置重试的次数,默认是重试2次
sendKernelImpl
sendKernelImpl
是核心的消息发送,这块主要是组装Message和调用注册的hook函数以及调用Netty进行发送,没有太多流程,也就是说消息其实是通过Netty直接发送到了brocker中
private SendResult sendKernelImpl(final Message msg, //
final MessageQueue mq, //
final CommunicationMode communicationMode, //
final SendCallback sendCallback, //
final TopicPublishInfo topicPublishInfo, //
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//获取brokerAddr的地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
//如果为空再次请求取获取一下地址
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
//如果地址不等于null
if (brokerAddr != null) {
//是否采用VIP通道
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
//获取消息内通
byte[] prevBody = msg.getBody();
try {
//创建消息唯一id
MessageClientIDSetter.setUniqID(msg);
int sysFlag = 0;
//尝试压缩消息 如果压缩成功 给sysFlag加上一个压缩标志
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.CompressedFlag;
}
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TransactionPreparedType;
}
//是否有hook函数,如果有 调用callback得到消息的信息
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
//是否有messagehook,如果有调用拿到message的所有信息
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
//设置头部
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//设置设置requestHeader的信息
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
//发送消息到mq
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
brokerAddr, // 1
mq.getBrokerName(), // 2
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode, // 6
sendCallback, // 7
topicPublishInfo, // 8
this.mQClientFactory, // 9
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
context, //
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
brokerAddr, // 1
mq.getBrokerName(), // 2
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode, // 6
context,//
this);
break;
default:
assert false;
break;
}
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
}
.........//省略
}
针对于同步来说是有返回结果的,但是异步是没有返回结果的,这块很神奇熟悉netty的都知道,其实netty是异步的编程模型,查看源码之后,其实是通过等待机制实现的同步,通过使用CountDownLatch
实现超时等待
Message 唯一id
只要是mq都会存在一个问题,如何创建Message的唯一id,下面是RocketMQ得到UniqID的方式,首先对获取ID进行加锁,主要还是通过时间戳 和count++实现,在性能上对比了下生成的UID比系统自带的UUID性能上快一个量级,生成80w数据只需要700毫秒左右。
public static synchronized String createUniqID() {
long current = System.currentTimeMillis();
//每次更新 starTime为当前时间
if (current >= nextStartTime) {
setStartTime(current);
}
buffer.position(0);
sb.setLength(basePos);
//通过拿到当前时间戳 并且counter++
buffer.putInt( (int)(System.currentTimeMillis() - startTime) );
buffer.putShort(counter++);
//然后把bytes转String
sb.append(UtilAll.bytes2string(buffer.array()));
return sb.toString();
}
拿到的ID大概是这样的:AC14091E353F6E0BE85839D734C80000,这块感觉会存在一种漏洞造成消息id重复,counter++ 是short类型肯定会造成溢出的,再加上当系统重启,而且System.currentTimeMillis() - startTime得到结果有几率重复时,就会造成这样的结果(严重问题),不过还好可以自定义messageId
上述大概讲了一个消息发送的源码流程,接下来看一下消息的Model
Message
、Netty传递ModelRemotingCommand
、的定义
Message Model
Message Model包含下面内容 主要是body内容
public class Message implements Serializable {
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
}
RemotingCommand Model
public class RemotingCommand {
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private transient byte[] body;
}
如何序列化
下面是对把headerData转换成具体的ByteBuffer
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
//默认4个字节
// 2> header data length
byte[] headerData;
//对header进行Encode,得到二进制,默认是json
headerData = this.headerEncode();
//统计header和body的Length
length += headerData.length;
length += bodyLength;
//具体分配的header空间是4+length-bodyLength
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
//写入总长度
// length
result.putInt(length);
//写入header的长度
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
//放入header数据
result.put(headerData);
result.flip();
return result;
}
上面拿到header后,NettyEncoder分别写入了header和body ```java public class NettyEncoder extends MessageToByteEncoder
{ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
//拿到头部后写入header
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
//写入body
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
}
```
问题总结
其实如何发送消息这块来说没有太多关注点,跟高并发其实关联不大,主要是关注序列化和消息格式。通过写完本文引发了很多问题:
- 消息在broker如何存储?
- 为什么分多个Queue性能就会提高?
- RocketMQ如何号称亿级别消息处理?
- broker如何实现高可用?
- 消息堆积是什么原因?