您的位置:  首页 > 技术 > 中间件 > 正文

rocketmq源码分析3-consumer消息获取

2021-07-30 12:34 管理员 次阅读 条评论

使用rocketmq的大体消息发送过程如下:

Snip20170222_17

在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作。。

 

1. 如何找到入口(MQ-broker端)

分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口。但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较绕的,主要有很多异步动作,还有循环调用(当然不是一个线程上,而且中间有阻塞队列缓冲),这对调试式分析代码造成了一些不方便。
回到正题,怎么找到这里入口?在具备上篇分析的基础上,我直接分析broker的代码,broker接收消息的时候是靠SendMessageProcessor,那么在消息传递给消费端的时候是不是也是靠某个processor完成的?据这些processor的命名观察,猜测PullMessageProcessor比较像。
为了验证这一想法,注释掉BrokerController中使用这个processor的地方,再重新测试,发现consumer就收不到producer发过来的消息了。想法初步正确。

2. 调试PullMessageProcessor(MQ-broker端)

RemotingServer在注册processor的时候,是根据RequestCode进行注册的。
PullMessageProcessor 对应的RequestCode的PULL_MESSAGE,即11。猜测:consumer客户端不断(或定时轮询,或循环调用,或其他方式)发起pull message请求给broker,broker会处理这些请求,后面会验证这个猜测。
PullMessageProcessor在注册的时候对应的线程池是pullMessageExecutor,线程池的corePoolSize以及maxPoolSize都可以在broker中进行config,字段名是pullMessageThreadPoolNums。默认值16+处理器个数*2。

3. 哪里发送了RequestCode为PULL_MESSAGE的请求(consumer客户端)

通过全局搜索,很容易发现是MQClientAPIImpl.pullMessage422行,发送了PULL_MESSAGE类型的请求。
加断点(consumer客户端需要debug方式启动),看调用堆栈。

Snip20170222_12

很容易发现 是 PullMessageService.run()发出了PULL_MESSAGE的request。 run的代码如下:

while (!this.isStoped()) {    try {

        PullRequest pullRequest = this.pullRequestQueue.take();        if (pullRequest != null) {            this.pullMessage(pullRequest);

        }

    }    catch (InterruptedException e) {

    }    catch (Exception e) {

        log.error("Pull Message Service Run Method exception", e);

    }

}

在线程没有停止的情况下,一直循环发拉取消息的请求,过程中被pullRequestQueue阻塞队列阻塞。
分析谁向pullRequestQueue put了元素?是PullMessageService.executePullRequestImmediately(PullRequest)方法。
谁调了上面的方法,同样断点分析,调用堆栈如下图:

Snip20170222_15

划蓝色线的ResponseFuture地方,是阿里对这种通过发送网络请求调用后还能回调回来的一个特性封装,值得学习。 划红色线的 MQClientAPIImpl$2地方是在处理业务逻辑,位于方法pullMessageAsync(String, RemotingCommand, long, PullCallback)内。此处又是一个异步。


private void pullMessageAsync(//        final String addr,// 1

        final RemotingCommand request,//        final long timeoutMillis,//        final PullCallback pullCallback//) throws RemotingException, InterruptedException {    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {

        @Override        public void operationComplete(ResponseFuture responseFuture) {

            RemotingCommand response = responseFuture.getResponseCommand();            if (response != null) {                try {

                    PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);                    assert pullResult != null;

                    pullCallback.onSuccess(pullResult);// 457行对应此处                }                catch (Exception e) {

                    pullCallback.onException(e);

                }

            }            else {                //... 省略            }

        }

    });

}

4. 调试MQClientAPIImpl.pullMessageAsync(consumer客户端)

谁调用了MQClientAPIImpl.pullMessageAsync?
449行打断点,堆栈如下:

Snip20170222_16

发现又回到上面的PullMessageService的run中。:-(
回头去看看3段落中pullCallback.onSuccess(pullResult);// 457行对应此处
这一行代码,跟进去就会发现玄机,在这里面又调用了PullMessageService.executePullRequestImmediately(PullRequest)方法。 是一个匿名内部类,位于DefaultMQPushConsumerImpl.pullMessage(PullRequest)中。这个方法太长,贴一些简略的,


final long beginTimestamp = System.currentTimeMillis();



PullCallback pullCallback = new PullCallback() {

    @Override    public void onSuccess(PullResult pullResult) {        if (pullResult != null) {

            pullResult =

                    DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(

                        pullRequest.getMessageQueue(), pullResult, subscriptionData);            switch (pullResult.getPullStatus()) {            case FOUND:                //...省略



                long firstMsgOffset = Long.MAX_VALUE;                if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {

                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

                }                else {                    //...省略



                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {

                        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,

                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());

                    }                    else {

                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);

                    }

                }                //...省略

                break;            case NO_NEW_MSG:                //...省略

                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                break;            case NO_MATCHED_MSG:                //...省略

                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                break;            case OFFSET_ILLEGAL:                //...省略

上述代码基本上很清楚地看出来拉取代码的发起逻辑了。在case FOUND分支打个断点,当你从producer的客户端发一条消息过来的时候,能看到断点被命中。当没有producer没有发消息的时候,一直走的就是case NO_NEW_MSG分支。

5. 上面分析的对吗?

不全对。为什么?回去看段落3的[1]标注出,关于是谁调用了PullMessageService.executePullRequestImmediately(PullRequest)方法。

其实不止是段落3分析的那样,还有RebalanceImpl.updateProcessQueueTableInRebalance(String, Set)417行发起的调用。
为什么会想到这个问题?因为按段落3 的分析,调用executePullRequestImmediately方法的入参是 PullRequest,但是上面分析是"循环"调用,那么最初始的这个PullRequest是哪里构造的?

带着这个疑问就不难发现肯定还有其他调用了executePullRequestImmediately方法。于是搜索,加断点,发现RebalanceImpl.updateProcessQueueTableInRebalance会调用。
其实通过搜索 new PullRequest 关键字也是很容易找到上述调用的地方。

谁对RebalanceImpl.updateProcessQueueTableInRebalance发起了调用?
观察调用栈:
consumer 构造PULL_MESSAGE的消息堆栈
Thread [RebalanceService] (Suspended (breakpoint at line 417 in RebalanceImpl))
RebalancePushImpl(RebalanceImpl).updateProcessQueueTableInRebalance(String, Set) line: 417
RebalancePushImpl(RebalanceImpl).rebalanceByTopic(String) line: 321 RebalancePushImpl(RebalanceImpl).doRebalance() line: 248
DefaultMQPushConsumerImpl.doRebalance() line: 250
MQClientInstance.doRebalance() line: 925
RebalanceService.run() line: 49 Thread.run() line: 695

RebalancePushImpl(RebalanceImpl).updateProcessQueueTableInRebalance(String, Set) line: 417

this.dispatchPullRequest(pullRequestList); 该方法对push形式会调用PullMessageService.executePullRequestImmediately。至此,疑问基本解决。

这个堆栈要在consumer启动时会发现,整个堆栈发生在线程 Thread [RebalanceService]


// --RebalanceService  run --@Overridepublic void run() {

    log.info(this.getServiceName() + " service started");    while (!this.isStoped()) {        this.waitForRunning(WaitInterval);// WaitInterval  = 10s

        this.mqClientFactory.doRebalance();// MQClientInstance.doRebalance()    }



    log.info(this.getServiceName() + " service end");

}//-- MQClientInstance.doRebalance() --public void doRebalance() {        for (String group : this.consumerTable.keySet()) {

            MQConsumerInner impl = this.consumerTable.get(group);            if (impl != null) {                try {

                    impl.doRebalance();// DefaultMQPushConsumerImpl.doRebalance()

                } catch (Exception e) {

                    log.error("doRebalance exception", e);

                }

            }

        }

    }    

// -- DefaultMQPushConsumerImpl.doRebalance --@Overridepublic void doRebalance() {    if (this.rebalanceImpl != null) {        this.rebalanceImpl.doRebalance();

    }

}// ......

此处RebalanceImpl的实现是RebalancePushImpl
根据上述线程名可以发现是RebalanceService这个类拉起了上面的RebalanceImpl.updateProcessQueueTableInRebalance

谁拉起了RebalanceService
看下面调用堆栈一目了然
Thread [main] (Suspended (breakpoint at line 185 in com.alibaba.rocketmq.client.impl.factory.MQClientInstance))
owns: com.alibaba.rocketmq.client.impl.factory.MQClientInstance (id=40)
com.alibaba.rocketmq.client.impl.factory.MQClientInstance.start() line: 185
com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start() line: 720
com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer.start() line: 365
org.simonme.rocketmq.demo.ConsumerDemo.main(java.lang.String[]) line: 55 // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testgroup");

因为我们针对该group设置的consumer就是Push的,即DefaultMQPushConsumer。官方提供的example工程中com.alibaba.rocketmq.example.quickstart.Consumer也是用的push。
引发问题:
1. 应该是可以针对不同的group设置不同形式的(pull or push)的consumer? 2. 不同的client实例订阅同一个group的同一个tag/不同tag会出现怎样的情况?是否还可以设置不同的获取消息方式(push or pull)

那么疑问又来了

如果此处我们设置的不是DefaultMQPushConsumer,而是DefaultMQPullConsumer,那么刚本节(第5节)开头那个问题怎么解决

尝试一下
先看Pull形式的Consumer的例子

在PullMessageService.executePullRequestImmediately(PullRequest)方法处加上断点并调试该demo,就会发现该方法根本不会被调用。因为dispatchPullRequest方法中针对pull模式的实现为空方法体,根本没有发起PullMessageService.executePullRequestImmediately调用。那么上面的疑问就解决了。

6. 分析PullMessageProcessor(MQ-broker端)

入口方法是processRequest
先做一系列的检查,然后获取消息,检查涉及的要点如下图:
获取消息分析
1. 根据offset查询到SelectMapedBufferResult实例。

final GetMessageResult getMessageResult =        this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),

            requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getQueueOffset(),

            requestHeader.getMaxMsgNums(), subscriptionData);


这个GetMessageResult中有SelectMapedBufferResult的List实例。

这个地方体现了零拷贝。消息落地磁盘的时候,是从file map出来的内存buffer,此时消费消息的时候无需再读取文件,而是直接读取map出来的buffer!当然如果堆积消息过多,内存中已经放不下的时候,就需要从磁盘上读取了。 这是rocketmq性能比较好的原因之一。 通常说的零拷贝是指系统态无需拷贝到用户态,即针对大文件拷贝常用的sendfile操作。此处不同于这个概念。

根据GetMessageResult取消息,没太多复杂的,如果发现有消息则走如下分支


switch (getMessageResult.getStatus()) {case FOUND:

    response.setCode(ResponseCode.SUCCESS);    // 消息轨迹:记录客户端拉取的消息记录(不表示消费成功)

    if (this.hasConsumeMessageHook()) {        // 执行hook

        ConsumeMessageContext context = new ConsumeMessageContext();

        context.setConsumerGroup(requestHeader.getConsumerGroup());

        context.setTopic(requestHeader.getTopic());


  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接