各种组建
CommitLog、HAService、namesSrv
CommitLog组建问题
消费者设计
问题
1.消费者启动时候会去broker拉取 broker中topic的信息以及各个queue的offset。消费者消费完数据后会修改本地queue的offset,然后以每五秒一次的频率同步给broker,来达到持久化。那么就会存在一个问题,如果一个集群都在做这个事情,就会导致offset乱掉,RocketMQ如何解决这个问题?
2.rocketmq.client.rebalance.waitInterval通过设置pull的频率,默认是20秒拉一次,会不会过于长?
解决:20秒是长轮训机制,存在一个超时,broker保持链接不释放,直到超时或者有数据,这样就需要超时的配置和长轮训的时间不能有间隔,相对于短轮训来说,实时性比较好。
PullMessageProcessor.java
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
上述是实现流程,一旦没有找到队列信息,就不会进行响应,会把对应的信息存储起来,放入到PullRequestHoldService
中,这里面会做一个一秒一次的检查,如果检查到消息到来了,就会把消息发送给消费者。不过这样的机制也会有问题,一旦客户端掉线了然后还没有消息,这里就会陷入无限的死循环,但是一旦机器连上会立即发送消息过去
4.存在一个问题,一旦客户端没有同步offset信息到broker,就会有重复信息产生
5.Consumer 如何确定自己使用那个队列?
6.Consumer 使用一个队列之后,为什么其他Consumer不能共享当前队列的信息?
RocketMQ一个消息队列只会对应一个消费者,也就是说消费者数量和队列数量得一致,否则会造成一些消费者无法使用队列,这样做是为了不加锁,去保证每个队列的消费进度,而且消费进度是每5秒同步一次给broker的消息队列的
7.commitLog 文件创建规则
设计优点
1.整个queue的流动是采用offset来设计的,所以天生就拥有重置功能,一旦queue堆积,这个时候可能会产生大量已经失效废弃的消息,可以通过重置功能重置到任意位置