05-消费者Consumer

2022-07-18

05-消费者Consumer

消费者Consumer

一. 启动流程

1.1 参数配置

  1. 创建一个DefaultMQPushConsumer,基于push模式的消费者客户端
  2. 设置nameserver地址及消费进度
  3. 订阅主题和TAG
  4. 注册消息监听器,这个是收到消息后的处理逻辑,核心业务逻辑
  5. 启动消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mhn_consumer1");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicMhn", "TAG_A");
consumer.registerMessageListener();
consumer.start();

1.2 构建主题订阅信息

构建SubscriptionData,并加入到RebalanceImpl的订阅消息中

订阅关系主要有以下两个来源:

  1. 消费者主动订阅,代码中所使用的subscribe
  2. 自动订阅重试消息主题。消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名
case CLUSTERING:
  //%RETRY%mhn_consumer1, 重试消息主题,%RETRY% + 消费者组名,启动时自动订阅该主题,以消费者组为单位进行消费重试
   final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
   SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
             retryTopic, SubscriptionData.SUB_ALL);
   this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);               

1.3 创建MQClientInstance

 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

1.4 创建rebalanceImpl

该类为消息重新负载实现类,即消息重试需要使用的类

1.5 获取消息进度

    获取消息进度。如果消息消费是集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息消费进度存储在消费端

switch (this.defaultMQPushConsumer.getMessageModel()) {
  case BROADCASTING:
    this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    break;
    //集群模式下初始化消息进度,消息进度存储在broker
  case CLUSTERING:
    this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    break;
  default:
    break;
}

1.6 创建消费者线程服务

//根据不同的消息监听类型(顺序消息还是并发消息,创建不同的线程服务ConsumeMessageService)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  this.consumeOrderly = true;
  this.consumeMessageService =
    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  this.consumeOrderly = false;
  this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}

二. 消息拉取机制

简而言之,拉取主要分为几下几步:

  1. 启动PullMessageService拉取消息线程
  2. 利用一个无界阻塞队列,不断轮询获取PullRequest
  3. 获取当前消费者的订阅消息
  4. 根据订阅消息向broker发送拉取请求
  5. broker根据请求命令返回消息
  6. 执行拉取回调函数,如果找到消息,将消息提交消费者处理消息线程,直接返回
  7. 准备下一次拉取任务

前四步属于客户端封装拉取请求的操作

然后broker处理请求返回响应

最后客户端根据响应,如果有消息提交消费线程处理

2.1 PullMessageService

该线程维护在MqClientInstance内部,在该instance启动时跟随启动,不断执行拉取消息任务

public void run() {
  log.info(this.getServiceName() + " service started");

  while (!this.isStopped()) {
    try {
      //从pullRequestQueue队列中获取一个PullRequest,这里是拿
      PullRequest pullRequest = this.pullRequestQueue.take();
      this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
      log.error("Pull Message Service Run Method exception", e);
    }
  }

  log.info(this.getServiceName() + " service end");
}

一旦从队列中获取到PullRequest,就开始构建拉取消息。

2.2 封装拉取请求

2.3 broker端处理请求

2.4 执行回调函数

主要是case FOUND:状态,即找到消息的处理方式

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
  pullResult.getMsgFoundList(),
  processQueue,
  pullRequest.getMessageQueue(),
  dispatchToConsume);

向消费消息服务提交消费请求,这一步是异步的方式,并不关心消费的状态,提交了就返回。

三. 消息消费流程


标题:05-消费者Consumer
作者:mahaonan
地址:https://mahaonan.fun/articles/2022/07/18/1658147023067.html