03-生产者Producer
03-生产者Producer
生产者Producer
一. 消息结构
1. 单体消息
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
//所属topic
private String topic;
private int flag;
/*
扩展属性
tag: 消息tag,用来过滤消息
keys: 索引建,多个空格隔开,用来快速检索信息
waitStoreMsgOK:消息发送时是否等到消息存储完后再返回
delayTimeLevel: 消息延迟级别,用于定时消息或消息重试
*/
private Map<String, String> properties;
private byte[] body;
private String transactionId;
}
- topic: 所属topic
- flag:rocketMq不做处理,业务上可以使用flag来做标记
- Properties: 拓展属性
- body:消息内容字节数组
- transactionId: 事务消息使用
2. 批量消息
public class MessageBatch extends Message implements Iterable<Message> {
private static final long serialVersionUID = 621335151046335557L;
private final List<Message> messages;
}
继承自message,便于消息发送的同一性
内部封装了List<Message>
,实现批量发送,那么MessageBatch的各项基础属性是如何设置的呢?
-
topic:取内部message的topic,因此批量发送只能发送给同一个topic
-
剩下的属性都是组合了单个消息,具体代码如下
public static byte[] encodeMessage(Message message) { //only need flag, body, properties byte[] body = message.getBody(); int bodyLen = body.length; String properties = messageProperties2String(message.getProperties()); byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); //note properties length must not more than Short.MAX short propertiesLength = (short) propertiesBytes.length; int sysFlag = message.getFlag(); int storeSize = 4 // 1 TOTALSIZE + 4 // 2 MAGICCOD + 4 // 3 BODYCRC + 4 // 4 FLAG + 4 + bodyLen // 4 BODY + 2 + propertiesLength; ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize); // 1 TOTALSIZE byteBuffer.putInt(storeSize); // 2 MAGICCODE byteBuffer.putInt(0); // 3 BODYCRC byteBuffer.putInt(0); // 4 FLAG int flag = message.getFlag(); byteBuffer.putInt(flag); // 5 BODY byteBuffer.putInt(bodyLen); byteBuffer.put(body); // 6 properties byteBuffer.putShort(propertiesLength); byteBuffer.put(propertiesBytes); return byteBuffer.array(); }
批量消息的body由这几部分构成:
- 4字节的totalSize,即整个body 的长度
- 4字节的MAGICCODE
- 4字节的BODYCRC
- 4字节的FLAG
- 4字节的单个message长度
- 单个message body信息
- 2字节的属性长度
- 属性信息
然后由多个这样的Message组合起来,即构成了MessageBatch的body
3. 扩展属性
有一个消息属性常量类
MessageConst
,保存了一些消息的扩展属性Map<String, String> properties;
这个Map存放了多个消息的扩展属性,如下所述
tag
: 消息tag,用来过滤消息keys
: 索引建,多个空格隔开,用来快速检索信息waitStoreMsgOK
:消息发送时是否等到消息存储完后再返回delayTimeLevel
消息延迟级别,用于定时消息或消息重试TRAN_MSG
:事务消息__STARTDELIVERTIME
或者DELAY
:延迟消息
二. 启动流程
1. 创建DefaultMQProducer
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
this.namespace = namespace;
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}
这其中的关键是DefaultMQProducerImpl(),看看做了哪些事情
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
//钩子函数
this.rpcHook = rpcHook;
//线程池的阻塞队列
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
//创建线程池,使用了自定的线程策略
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
//自定义工厂,主要用来线程池的命名
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
}
至此,我们就创建了一个默认的Producer
2. 启动生产者
2.1 设置nameserver的地址
producer.setNamesrvAddr("localhost:9876");
这里也可以在启动项中添加参数来设置,rocketMq支持很多的启动项参数,这些启动项参数保存在MixAll
这个类中
![]()
public static String getNameServerAddresses() { return System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)); } public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR"; public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";
2.2 检查producer配置并设置instanceName
//校验producer配置
this.checkConfig();
//检查productGroup是否符合要求;并改变生产者的instanceName为进程ID。
//如果productGroup不是CLIENT_INNER_PRODUCER,并且instanceName是默认的
// if (this.instanceName.equals("DEFAULT")) {
// this.instanceName = String.valueOf(UtilAll.getPid());
// }
// 那么就把进程id作为instanceName
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
这里主要是设置instanceName为进程id,这样能够保证clientId的唯一性(避免了同一台机器部署多个应用程序导致clientId重复)
2.3 创建或获取MQClientInstance
//MQClientInstane关键类
//同一个JVM中的不同消费者和不同生产者在启动时获取到的MQClientInstane实例都是同一个
//经典饿汉模式单例模式
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
整个JVM实例中只存在一个MQClientManager实例
维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientId */, MQClientInstance>factoryTable =newConcurrentHashMap<String, MQClientInstance>(
也就是同一个clientId只会创建一个MQClientInstance
clientId为客户端IP+instance+(unitname可选)
public String buildMQClientId() { StringBuilder sb = new StringBuilder(); sb.append(this.getClientIP()); sb.append("@"); sb.append(this.getInstanceName()); if (!UtilAll.isBlank(this.unitName)) { sb.append("@"); sb.append(this.unitName); } return sb.toString(); }
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
//这里防止并发下出问题(上面new的时候并没有加锁),进行了双重判断,通过返回值来判断是否有其他线程放入了相同clientId
//若有,则返回之前的,保证同一个clientId只有一个实例
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
这里运用了双重判断的技巧,使用putIfAbsent
2.4 向MQClientInstance注册生产者
MQClientInstance中维护了一个生产者的Map,新的生产者都会加入其中
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
将当前生产者加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等
//注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
//已经创建过的group抛出异常
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
3. 启动MQClientInstance
如果MQClientInstance已经启动,则本次启动不会真正执行
三. 消息发送
消息发送大致流程如下:
- 验证消息是否合法
- 查找topic路由信息
TopicPublishInfo
- 选择一个消息队列
- 发送消息(同步,异步,单向)
1. 验证消息
- 确保生产者处于运行状态
- 验证消息长度,最大不能超过4M,但是也不能为0
- 主题名称、消息体不能为空
private int maxMessageSize = 1024 * 1024 * 4; // 4M
2. 查找topic路由
总体流程如下:
- 先从生产者缓存中获取路由信息
- 如果缓存没有,再去nameserver获取路由信息
- 如果nameserver中还没有,则使用默认主题
TBW102
来查找路由信息- 结合broker的配置,是否自动创建topic,来决定是否则通过
TBW102
查找到路由信息
核心方法是:
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
在更新路由信息时候,也会更新broker,生产者,消费者信息,具体就是维护
topicRouteTable
,brokerAddrTable
,producerTable
,consumerTable
这几个缓存map
- 首先来看看
TopicPublishInfo
的组成
public class TopicPublishInfo {
//是否是顺序消息,默认不是
private boolean orderTopic = false;
//是否已经保存了topic路由信息
private boolean haveTopicRouterInfo = false;
//消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//消息队列序号,每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0,用于选择消息队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
//topic路由信息
private TopicRouteData topicRouteData;
}
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
//topic队列元数据
private List<QueueData> queueDatas;
//broker数据信息.topic分布的broker元数据
private List<BrokerData> brokerDatas;
//broker上过滤服务器地址列表。
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
- 然后来看看具体是怎么查找的
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//如果生产者没有缓存topicPublishInfo,则从nameserver去寻找
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//如果还未找到,尝试用默认topic去寻找,如果brokerConfig的autoCreateTopicEnable=true,将使用默认主题
//AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
先从生产者的缓存topicPublishInfoTable
里面找,如果没有缓存,则去从nameserver获取topic路由信息
如果nameserver中没有,说明该topic未创建。那么会从默认主题"TBW102"
中去获取路由信息.
如果broker设置了
autoCreateTopicEnable=true
, 那么会在启动时创建TBW102
,这样这能获取到该路由信息。具体的细节再学习broker之后再来分析下所以,生产环境,最好设置
autoCreateTopicEnable=false
,这样topic的分配,broker负载均衡就能受我们掌控
3. 选择消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
核心方法就是selectOneMessageQueue
,看看生产者是怎么选择一个队列去发送消息的。
·sendLatencyFaultEnable
决定是否启用broker故障延迟机制,默认情况下不启用
//可以设置启用
producer.setSendLatencyFaultEnable(true);
3.1 不启用broker故障延迟机制
默认情况下,rocketMq是不启用broker故障延迟机制的,会进行队列轮询,
每次根据sendWhichQueue
去获取对应的队列;
如果上一次失败了,那么会跳过上一次的broker,选择一个新的broker
最后没有获取到的话,说明没有满足条件的broker,就默认选一个
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
//第一次选择队列,直接选一个
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//上一次失败的broker队列不再去选择,降低错误率,选择一个新的没有发生过错误的队列
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
这样取会有一个问题,这里只是过滤了上次的broker,但是如果第三次,还是可能会取到第一次的broker的,此时这个broker不一定是可用的,因此保证不了高可用。
3.2 启用broker故障延迟机制
启用broker故障延迟机制后,会有一个相应的处理策略
在本次发送失败后,rocketMq会维护一个
faultItemTable
,里面存放了broker和对应的故障FaultItem
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
FaultItem中存放了该broker下次开始使用的时间
startTimestamp
然后在选择队列的时候,根据是否达到这个时间来判断当前broker是否可用
总体而言,就是怎么去维护faultItemTable
,怎么计算这个故障时间是多久
下面来看看rocketMQ是怎么设计这种策略的

上图即是涉及到故障处理的核心类
-
FaultItem
//失败条目。即应该规避的 class FaultItem implements Comparable<FaultItem> { //唯一名称,brokerName private final String name; //本次消息发送延迟 private volatile long currentLatency; //故障规避开始时间,用来判断该broker是否可用 private volatile long startTimestamp; /** * 看是否达到broker故障规避时间 * @return */ public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; } }
这个类负责存储指定broker所应该规避的时间,
isAvailable
则提供了何时可以使用,当前时间达到了startTimestamp
即可
当本次发送失败时,在捕捉异常的时候,会调用updateFaultItem
方法
/**
*
* @param brokerName
* @param currentLatency 本次消息发送延时时间
* @param isolation 是否隔离, 若为true则使用默认时长30s来计算Broker故障规避时长,
* 如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
//这个值是接下来多久的时间内该Broker将不参与消息发送队列负载
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
在获取duration
时,使用了如下的策略
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
public class MQFaultStrategy {
private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
/*
根据currentLatency本次消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引index,如果没有找到,返回0。
然后根据这个索引从notAvailableDuration数组中取出对应的时间,在这个时长内,Broker将设置为不可用。
*/
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
}
最后,维护faultItemTable
,给faultItem
中设置StartTimestamp
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//这里也是双重判断,注意学习这种
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
然后,根据上面设置好StartTimestamp
,在选择队列的时候,判断isAvailable
来规避故障的broker
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//如果broker已经可用,则返回这个broker下的队列
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
//这里貌似是个bug,应该是不等于把
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) //????
return mq;
}
}
4. 消息发送
4.1 同步发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
这个方法是消息发送的核心方法
-
获取broker网络地址
//尝试从MQClientInstance中寻找broker地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { ///如果缓存没有,从nameserver更新信息,再次寻找 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); }
-
对消息进行处理
-
如果是单体消息,则设置消息唯一全局id
-
如果消息大小超过4K,则进行压缩,并加入系统标记
MessageSysFlag.COMPRESSED_FLAG
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
-
-
-
如果是事务消息,加入系统标记
MessageSysFlag.TRANSACTION_PREPARED_TYPE
-
如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑
- 事务消息
MessageType.Trans_Msg_Half
,通过设置消息属性TRAN_MSG
- 延迟消息
MessageType.Delay_Msg
,通过设置消息属性__STARTDELIVERTIME
或者DELAY
- 事务消息
-
-
构建发送请求包
这里需要注意对重试消息的处理
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%"; if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } }
-
发送请求
如果是同步发送,会等待broker返回响应,如果失败会进行重试
-
如果注册了钩子函数,会执行after逻辑
就算消息发送过程中发生RemotingException、MQBrokerException、InterruptedException时该方法也会执行
4.2 异步发送
- 消息异步发送是指消息生产者调用发送的API后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。
- 异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数
clientAsync Semaphore Value
来控制,默认为65535。 - 异步消息发送虽然也可以通过
DefaultMQProducer#retryTimes-WhenSendAsyncFailed
属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的
,如果出现网络异常、网络超时等将不会重试。(同步发送在这些情况下也会重试
)
4.4 单向发送
单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。
5. 消息重试
以同步发送为例,当本次发送抛出异常时,会进行消息重试,默认重试2次
加上第一次的次数,消息默认情况下会发送3次