RocketMQ源码分析一之消费者发送信息

//异步信息信息发送
    private SendResult sendDefaultImpl(//
                                       Message msg, //
                                       final CommunicationMode communicationMode, //
                                       final SendCallback sendCallback, //
                                       final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        //校验消息长度
        Validators.checkMessage(msg, this.defaultMQProducer);
        //获取invodeID
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //从nameServer上面拉去具体Topic对应的BrokerAddr和对应的 消息队列信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        //如果拉去到了就进行消息发送
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            //如果当前是异步模式 就获取重试次数,如果是同步就采用一次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;

            String[] brokersSent = new String[timesTotal];
            //发送信息和重试逻辑
            for (; times < timesTotal; times++) {
                //获取brockerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //这里采用了负载均衡策略,每次轮训queue 发送信息,达到负载
                MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            } // end of for

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
                    times, //
                    (System.currentTimeMillis() - beginTimestampFirst), //
                    msg.getTopic(), //
                    Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.ConnectBrokerException);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.AccessBrokerTimeout);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BrokerNotExistException);
            }

            throw mqClientException;
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                    "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NoNameServerException);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NotFoundTopicException);
    }

上述代码比较多,但是逻辑相对比较简单,首先得理解RocketMQ的几个名词NameServer、Brocker。然后主要是几个函数的流程,tryToFindTopicPublishInfoselectOneMessageQueuesendKernelImpl

tryToFindTopicPublishInfo流程

一般来说NameServer相当于zookeeper,做服务发现用,大致流程如下:

  1. 从本地缓存获取brocker,如果存在就返回,如果不存在继续下面的步骤
  2. 从NameServer拉去brocker的信息和本地的进行对比,如果有改变或者本地不存在就进行更新

selectOneMessageQueue 流程

在RocketMQ中一个Topic对应多个队列,通过多个队列的形式来达到高并发的效果,多个队列之间会采用轮训的方式使用,消息每次都发送到不同的消息队列中,index++%queueSize。Producer通过从NameServer中获取到brocker的Queue的Id,能够知道具体发送到Brocker中的具体Queue,大概对应关系如下:

重试机制

异步发送消息的时候可以采用producer.setRetryTimesWhenSendFailed()进行设置重试的次数,默认是重试2次

sendKernelImpl

sendKernelImpl是核心的消息发送,这块主要是组装Message和调用注册的hook函数以及调用Netty进行发送,没有太多流程,也就是说消息其实是通过Netty直接发送到了brocker中

private SendResult sendKernelImpl(final Message msg, //
                                      final MessageQueue mq, //
                                      final CommunicationMode communicationMode, //
                                      final SendCallback sendCallback, //
                                      final TopicPublishInfo topicPublishInfo, //
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //获取brokerAddr的地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            //如果为空再次请求取获取一下地址
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        //如果地址不等于null
        if (brokerAddr != null) {
            //是否采用VIP通道
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
            //获取消息内通
            byte[] prevBody = msg.getBody();
            try {
                //创建消息唯一id
                MessageClientIDSetter.setUniqID(msg);

                int sysFlag = 0;
                //尝试压缩消息 如果压缩成功 给sysFlag加上一个压缩标志
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.CompressedFlag;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TransactionPreparedType;
                }
                //是否有hook函数,如果有 调用callback得到消息的信息
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }
                //是否有messagehook,如果有调用拿到message的所有信息
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }
                //设置头部
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                //设置设置requestHeader的信息
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }
                //发送消息到mq
                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                sendCallback, // 7
                                topicPublishInfo, // 8
                                this.mQClientFactory, // 9
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                                context, //
                                this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                                brokerAddr, // 1
                                mq.getBrokerName(), // 2
                                msg, // 3
                                requestHeader, // 4
                                timeout, // 5
                                communicationMode, // 6
                                context,//
                                this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            }
            .........//省略
    }

针对于同步来说是有返回结果的,但是异步是没有返回结果的,这块很神奇熟悉netty的都知道,其实netty是异步的编程模型,查看源码之后,其实是通过等待机制实现的同步,通过使用CountDownLatch实现超时等待

Message 唯一id

只要是mq都会存在一个问题,如何创建Message的唯一id,下面是RocketMQ得到UniqID的方式,首先对获取ID进行加锁,主要还是通过时间戳 和count++实现,在性能上对比了下生成的UID比系统自带的UUID性能上快一个量级,生成80w数据只需要700毫秒左右。

public static synchronized String createUniqID() {
            long current = System.currentTimeMillis();
            //每次更新 starTime为当前时间
            if (current >= nextStartTime) {
                setStartTime(current);
            }            
            buffer.position(0);          
            sb.setLength(basePos);         
            //通过拿到当前时间戳 并且counter++
            buffer.putInt( (int)(System.currentTimeMillis() - startTime) );
            buffer.putShort(counter++);
            //然后把bytes转String
            sb.append(UtilAll.bytes2string(buffer.array()));
            return sb.toString();
    }

拿到的ID大概是这样的:AC14091E353F6E0BE85839D734C80000,这块感觉会存在一种漏洞造成消息id重复,counter++ 是short类型肯定会造成溢出的,再加上当系统重启,而且System.currentTimeMillis() - startTime得到结果有几率重复时,就会造成这样的结果(严重问题),不过还好可以自定义messageId

上述大概讲了一个消息发送的源码流程,接下来看一下消息的ModelMessage、Netty传递ModelRemotingCommand、的定义

Message Model

Message Model包含下面内容 主要是body内容

public class Message implements Serializable {
    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
}

RemotingCommand Model

public class RemotingCommand {
 private int code;
    private LanguageCode language = LanguageCode.JAVA;
    private int version = 0;
    private int opaque = requestId.getAndIncrement();
    private int flag = 0;
    private String remark;
    private HashMap<String, String> extFields;
    private transient CommandCustomHeader customHeader;
    private transient byte[] body;
}
如何序列化

下面是对把headerData转换成具体的ByteBuffer

 public ByteBuffer encodeHeader(final int bodyLength) {
        // 1> header length size
        int length = 4;
        //默认4个字节
        // 2> header data length
        byte[] headerData;
        //对header进行Encode,得到二进制,默认是json
        headerData = this.headerEncode();
        //统计header和body的Length
        length += headerData.length;
        length += bodyLength;
        //具体分配的header空间是4+length-bodyLength
        ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
        //写入总长度
        // length
        result.putInt(length);
        //写入header的长度
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        //放入header数据
        result.put(headerData);
        result.flip();

        return result;
    }

上面拿到header后,NettyEncoder分别写入了header和body ```java public class NettyEncoder extends MessageToByteEncoder { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.RemotingLogName);

@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {
    try {
    //拿到头部后写入header
        ByteBuffer header = remotingCommand.encodeHeader();
        out.writeBytes(header);
        byte[] body = remotingCommand.getBody();
        //写入body
        if (body != null) {
            out.writeBytes(body);
        }
    } catch (Exception e) {
        log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
        if (remotingCommand != null) {
            log.error(remotingCommand.toString());
        }
        RemotingUtil.closeChannel(ctx.channel());
    }
}

}

```

问题总结

其实如何发送消息这块来说没有太多关注点,跟高并发其实关联不大,主要是关注序列化和消息格式。通过写完本文引发了很多问题:

  1. 消息在broker如何存储?
  2. 为什么分多个Queue性能就会提高?
  3. RocketMQ如何号称亿级别消息处理?
  4. broker如何实现高可用?
  5. 消息堆积是什么原因?

results matching ""

    No results matching ""