05-消费者Consumer
2022-07-18
05-消费者Consumer
消费者Consumer
一. 启动流程
1.1 参数配置
- 创建一个
DefaultMQPushConsumer
,基于push模式的消费者客户端 - 设置nameserver地址及消费进度
- 订阅主题和TAG
- 注册消息监听器,这个是收到消息后的处理逻辑,核心业务逻辑
- 启动消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("mhn_consumer1");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicMhn", "TAG_A");
consumer.registerMessageListener();
consumer.start();
1.2 构建主题订阅信息
构建SubscriptionData
,并加入到RebalanceImpl的订阅消息中
订阅关系主要有以下两个来源:
- 消费者主动订阅,代码中所使用的subscribe
- 自动订阅重试消息主题。消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名
case CLUSTERING:
//%RETRY%mhn_consumer1, 重试消息主题,%RETRY% + 消费者组名,启动时自动订阅该主题,以消费者组为单位进行消费重试
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
1.3 创建MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
1.4 创建rebalanceImpl
该类为消息重新负载实现类,即消息重试需要使用的类
1.5 获取消息进度
获取消息进度。如果消息消费是集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息消费进度存储在消费端
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
//集群模式下初始化消息进度,消息进度存储在broker
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
1.6 创建消费者线程服务
//根据不同的消息监听类型(顺序消息还是并发消息,创建不同的线程服务ConsumeMessageService)
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
二. 消息拉取机制
简而言之,拉取主要分为几下几步:
- 启动PullMessageService拉取消息线程
- 利用一个无界阻塞队列,不断轮询获取PullRequest
- 获取当前消费者的订阅消息
- 根据订阅消息向broker发送拉取请求
- broker根据请求命令返回消息
- 执行拉取回调函数,如果找到消息,将消息提交消费者处理消息线程,直接返回
- 准备下一次拉取任务
前四步属于客户端封装拉取请求的操作
然后broker处理请求返回响应
最后客户端根据响应,如果有消息提交消费线程处理
2.1 PullMessageService
该线程维护在MqClientInstance
内部,在该instance启动时跟随启动,不断执行拉取消息任务
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//从pullRequestQueue队列中获取一个PullRequest,这里是拿
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
一旦从队列中获取到PullRequest,就开始构建拉取消息。
2.2 封装拉取请求
2.3 broker端处理请求
2.4 执行回调函数
主要是case FOUND:状态,即找到消息的处理方式
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
向消费消息服务提交消费请求,这一步是异步的方式,并不关心消费的状态,提交了就返回。