各种组建

CommitLog、HAService、namesSrv

CommitLog组建问题

消费者设计

问题

1.消费者启动时候会去broker拉取 broker中topic的信息以及各个queue的offset。消费者消费完数据后会修改本地queue的offset,然后以每五秒一次的频率同步给broker,来达到持久化。那么就会存在一个问题,如果一个集群都在做这个事情,就会导致offset乱掉,RocketMQ如何解决这个问题?
2.rocketmq.client.rebalance.waitInterval通过设置pull的频率,默认是20秒拉一次,会不会过于长?

解决:20秒是长轮训机制,存在一个超时,broker保持链接不释放,直到超时或者有数据,这样就需要超时的配置和长轮训的时间不能有间隔,相对于短轮训来说,实时性比较好。 PullMessageProcessor.java

case ResponseCode.PULL_NOT_FOUND:

                    if (brokerAllowSuspend && hasSuspendFlag) {
                        long pollingTimeMills = suspendTimeoutMillisLong;
                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
                        }

                        String topic = requestHeader.getTopic();
                        long offset = requestHeader.getQueueOffset();
                        int queueId = requestHeader.getQueueId();
                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
                                this.brokerController.getMessageStore().now(), offset, subscriptionData);
                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                        response = null;
                        break;
                    }

上述是实现流程,一旦没有找到队列信息,就不会进行响应,会把对应的信息存储起来,放入到PullRequestHoldService中,这里面会做一个一秒一次的检查,如果检查到消息到来了,就会把消息发送给消费者。不过这样的机制也会有问题,一旦客户端掉线了然后还没有消息,这里就会陷入无限的死循环,但是一旦机器连上会立即发送消息过去

4.存在一个问题,一旦客户端没有同步offset信息到broker,就会有重复信息产生
5.Consumer 如何确定自己使用那个队列?
6.Consumer 使用一个队列之后,为什么其他Consumer不能共享当前队列的信息?

RocketMQ一个消息队列只会对应一个消费者,也就是说消费者数量和队列数量得一致,否则会造成一些消费者无法使用队列,这样做是为了不加锁,去保证每个队列的消费进度,而且消费进度是每5秒同步一次给broker的消息队列的

7.commitLog 文件创建规则

设计优点

1.整个queue的流动是采用offset来设计的,所以天生就拥有重置功能,一旦queue堆积,这个时候可能会产生大量已经失效废弃的消息,可以通过重置功能重置到任意位置

results matching ""

    No results matching ""