03-生产者Producer

2022-07-18

03-生产者Producer

生产者Producer

一. 消息结构

1. 单体消息

public class Message implements Serializable {
  private static final long serialVersionUID = 8445773977080406428L;

  //所属topic
  private String topic;
  private int flag;
  /*
        扩展属性
        tag: 消息tag,用来过滤消息
        keys: 索引建,多个空格隔开,用来快速检索信息
        waitStoreMsgOK:消息发送时是否等到消息存储完后再返回
        delayTimeLevel: 消息延迟级别,用于定时消息或消息重试
     */

  private Map<String, String> properties;
  private byte[] body;
  private String transactionId;
}
  • topic: 所属topic
  • flag:rocketMq不做处理,业务上可以使用flag来做标记
  • Properties: 拓展属性
  • body:消息内容字节数组
  • transactionId: 事务消息使用

2. 批量消息

public class MessageBatch extends Message implements Iterable<Message> {

    private static final long serialVersionUID = 621335151046335557L;
    private final List<Message> messages;
}

继承自message,便于消息发送的同一性

内部封装了List<Message>,实现批量发送,那么MessageBatch的各项基础属性是如何设置的呢?

  • topic:取内部message的topic,因此批量发送只能发送给同一个topic

  • 剩下的属性都是组合了单个消息,具体代码如下

    public static byte[] encodeMessage(Message message) {
      //only need flag, body, properties
      byte[] body = message.getBody();
      int bodyLen = body.length;
      String properties = messageProperties2String(message.getProperties());
      byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
      //note properties length must not more than Short.MAX
      short propertiesLength = (short) propertiesBytes.length;
      int sysFlag = message.getFlag();
      int storeSize = 4 // 1 TOTALSIZE
        + 4 // 2 MAGICCOD
        + 4 // 3 BODYCRC
        + 4 // 4 FLAG
        + 4 + bodyLen // 4 BODY
        + 2 + propertiesLength;
      ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
      // 1 TOTALSIZE
      byteBuffer.putInt(storeSize);
    
      // 2 MAGICCODE
      byteBuffer.putInt(0);
    
      // 3 BODYCRC
      byteBuffer.putInt(0);
    
      // 4 FLAG
      int flag = message.getFlag();
      byteBuffer.putInt(flag);
    
      // 5 BODY
      byteBuffer.putInt(bodyLen);
      byteBuffer.put(body);
    
      // 6 properties
      byteBuffer.putShort(propertiesLength);
      byteBuffer.put(propertiesBytes);
    
      return byteBuffer.array();
    }
    

    批量消息的body由这几部分构成:

    • 4字节的totalSize,即整个body 的长度
    • 4字节的MAGICCODE
    • 4字节的BODYCRC
    • 4字节的FLAG
    • 4字节的单个message长度
    • 单个message body信息
    • 2字节的属性长度
    • 属性信息

    然后由多个这样的Message组合起来,即构成了MessageBatch的body

3. 扩展属性

有一个消息属性常量类MessageConst,保存了一些消息的扩展属性

Map<String, String> properties;

这个Map存放了多个消息的扩展属性,如下所述

  • tag: 消息tag,用来过滤消息
  • keys: 索引建,多个空格隔开,用来快速检索信息
  • waitStoreMsgOK:消息发送时是否等到消息存储完后再返回
  • delayTimeLevel 消息延迟级别,用于定时消息或消息重试
  • TRAN_MSG事务消息
  • __STARTDELIVERTIME或者DELAY:延迟消息

二. 启动流程

1. 创建DefaultMQProducer

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
  this.namespace = namespace;
  this.producerGroup = producerGroup;
  defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

这其中的关键是DefaultMQProducerImpl(),看看做了哪些事情

public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
  this.defaultMQProducer = defaultMQProducer;
  //钩子函数
  this.rpcHook = rpcHook;

  //线程池的阻塞队列
  this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
  //创建线程池,使用了自定的线程策略
  this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors(),
    1000 * 60,
    TimeUnit.MILLISECONDS,
    this.asyncSenderThreadPoolQueue,
    //自定义工厂,主要用来线程池的命名
    new ThreadFactory() {
      private AtomicInteger threadIndex = new AtomicInteger(0);

      @Override
      public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
      }
    });
}

至此,我们就创建了一个默认的Producer

2. 启动生产者

2.1 设置nameserver的地址

producer.setNamesrvAddr("localhost:9876");

这里也可以在启动项中添加参数来设置,rocketMq支持很多的启动项参数,这些启动项参数保存在MixAll这个类中

image-20201125192253208
public static String getNameServerAddresses() {
    return System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
}

public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";

2.2 检查producer配置并设置instanceName

//校验producer配置
this.checkConfig();

//检查productGroup是否符合要求;并改变生产者的instanceName为进程ID。
//如果productGroup不是CLIENT_INNER_PRODUCER,并且instanceName是默认的
// if (this.instanceName.equals("DEFAULT")) {
//      this.instanceName = String.valueOf(UtilAll.getPid());
//  }
//   那么就把进程id作为instanceName
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  this.defaultMQProducer.changeInstanceNameToPID();
}

这里主要是设置instanceName为进程id,这样能够保证clientId的唯一性(避免了同一台机器部署多个应用程序导致clientId重复)

2.3 创建或获取MQClientInstance

//MQClientInstane关键类
//同一个JVM中的不同消费者和不同生产者在启动时获取到的MQClientInstane实例都是同一个
//经典饿汉模式单例模式
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);

整个JVM实例中只存在一个MQClientManager实例

维护一个MQClientInstance缓存表

ConcurrentMap<String/* clientId */, MQClientInstance>factoryTable =newConcurrentHashMap<String, MQClientInstance>(

也就是同一个clientId只会创建一个MQClientInstance

clientId为客户端IP+instance+(unitname可选)

public String buildMQClientId() {
  StringBuilder sb = new StringBuilder();
  sb.append(this.getClientIP());
  sb.append("@");
  sb.append(this.getInstanceName());
  if (!UtilAll.isBlank(this.unitName)) {
    sb.append("@");
    sb.append(this.unitName);
  }
  return sb.toString();
}
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
  String clientId = clientConfig.buildMQClientId();
  MQClientInstance instance = this.factoryTable.get(clientId);
  if (null == instance) {
    instance =
      new MQClientInstance(clientConfig.cloneClientConfig(),
                           this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
    //这里防止并发下出问题(上面new的时候并没有加锁),进行了双重判断,通过返回值来判断是否有其他线程放入了相同clientId
    //若有,则返回之前的,保证同一个clientId只有一个实例
    MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
    if (prev != null) {
      instance = prev;
      log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    } else {
      log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    }
  }

  return instance;
}

这里运用了双重判断的技巧,使用putIfAbsent

2.4 向MQClientInstance注册生产者

MQClientInstance中维护了一个生产者的Map,新的生产者都会加入其中

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

将当前生产者加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等

//注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
  //已经创建过的group抛出异常
  this.serviceState = ServiceState.CREATE_JUST;
  throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                              + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                              null);
}

3. 启动MQClientInstance

如果MQClientInstance已经启动,则本次启动不会真正执行

三. 消息发送

消息发送大致流程如下:

  1. 验证消息是否合法
  2. 查找topic路由信息TopicPublishInfo
  3. 选择一个消息队列
  4. 发送消息(同步,异步,单向)

1. 验证消息

  • 确保生产者处于运行状态
  • 验证消息长度,最大不能超过4M,但是也不能为0
  • 主题名称、消息体不能为空
private int maxMessageSize = 1024 * 1024 * 4; // 4M

2. 查找topic路由

总体流程如下:

  1. 先从生产者缓存中获取路由信息
  2. 如果缓存没有,再去nameserver获取路由信息
  3. 如果nameserver中还没有,则使用默认主题TBW102来查找路由信息
  4. 结合broker的配置,是否自动创建topic,来决定是否则通过TBW102查找到路由信息

核心方法是:

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

在更新路由信息时候,也会更新broker,生产者,消费者信息,具体就是维护topicRouteTablebrokerAddrTableproducerTableconsumerTable这几个缓存map

  • 首先来看看TopicPublishInfo的组成
public class TopicPublishInfo {
  //是否是顺序消息,默认不是
  private boolean orderTopic = false;
  //是否已经保存了topic路由信息
  private boolean haveTopicRouterInfo = false;
  //消息队列
  private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
  //消息队列序号,每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0,用于选择消息队列
  private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
  //topic路由信息
  private TopicRouteData topicRouteData;
}
public class TopicRouteData extends RemotingSerializable {
  private String orderTopicConf;
  //topic队列元数据
  private List<QueueData> queueDatas;
  //broker数据信息.topic分布的broker元数据
  private List<BrokerData> brokerDatas;
  //broker上过滤服务器地址列表。
  private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
  • 然后来看看具体是怎么查找的
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  //如果生产者没有缓存topicPublishInfo,则从nameserver去寻找
  if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
  }

  if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;
  } else {
    //如果还未找到,尝试用默认topic去寻找,如果brokerConfig的autoCreateTopicEnable=true,将使用默认主题
    //AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"
    this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    topicPublishInfo = this.topicPublishInfoTable.get(topic);
    return topicPublishInfo;
  }
}

先从生产者的缓存topicPublishInfoTable里面找,如果没有缓存,则去从nameserver获取topic路由信息

如果nameserver中没有,说明该topic未创建。那么会从默认主题"TBW102"中去获取路由信息.

如果broker设置了autoCreateTopicEnable=true, 那么会在启动时创建TBW102,这样这能获取到该路由信息。具体的细节再学习broker之后再来分析下

所以,生产环境,最好设置autoCreateTopicEnable=false,这样topic的分配,broker负载均衡就能受我们掌控

3. 选择消息队列

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

核心方法就是selectOneMessageQueue,看看生产者是怎么选择一个队列去发送消息的。

·sendLatencyFaultEnable决定是否启用broker故障延迟机制,默认情况下不启用

//可以设置启用
producer.setSendLatencyFaultEnable(true);

3.1 不启用broker故障延迟机制

默认情况下,rocketMq是不启用broker故障延迟机制的,会进行队列轮询,

每次根据sendWhichQueue去获取对应的队列;

如果上一次失败了,那么会跳过上一次的broker,选择一个新的broker

最后没有获取到的话,说明没有满足条件的broker,就默认选一个

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  if (lastBrokerName == null) {
    //第一次选择队列,直接选一个
    return selectOneMessageQueue();
  } else {
    int index = this.sendWhichQueue.getAndIncrement();
    for (int i = 0; i < this.messageQueueList.size(); i++) {
      int pos = Math.abs(index++) % this.messageQueueList.size();
      if (pos < 0)
        pos = 0;
      MessageQueue mq = this.messageQueueList.get(pos);
      //上一次失败的broker队列不再去选择,降低错误率,选择一个新的没有发生过错误的队列
      if (!mq.getBrokerName().equals(lastBrokerName)) {
        return mq;
      }
    }
    return selectOneMessageQueue();
  }
}

这样取会有一个问题,这里只是过滤了上次的broker,但是如果第三次,还是可能会取到第一次的broker的,此时这个broker不一定是可用的,因此保证不了高可用。

3.2 启用broker故障延迟机制

启用broker故障延迟机制后,会有一个相应的处理策略

  1. 在本次发送失败后,rocketMq会维护一个faultItemTable,里面存放了broker和对应的故障FaultItem

    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
    
  2. FaultItem中存放了该broker下次开始使用的时间startTimestamp

  3. 然后在选择队列的时候,根据是否达到这个时间来判断当前broker是否可用

总体而言,就是怎么去维护faultItemTable,怎么计算这个故障时间是多久

下面来看看rocketMQ是怎么设计这种策略的

image-20201126181955742

上图即是涉及到故障处理的核心类

  • FaultItem

    //失败条目。即应该规避的
    class FaultItem implements Comparable<FaultItem> {
      //唯一名称,brokerName
      private final String name;
      //本次消息发送延迟
      private volatile long currentLatency;
      //故障规避开始时间,用来判断该broker是否可用
      private volatile long startTimestamp;
    
      /**
     * 看是否达到broker故障规避时间
     * @return
     */
      public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
      }
    }
    

    这个类负责存储指定broker所应该规避的时间,isAvailable则提供了何时可以使用,当前时间达到了startTimestamp即可

当本次发送失败时,在捕捉异常的时候,会调用updateFaultItem方法

/**
     *
     * @param brokerName
     * @param currentLatency 本次消息发送延时时间
     * @param isolation 是否隔离, 若为true则使用默认时长30s来计算Broker故障规避时长,
     *                  如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。
     */
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
  this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

 public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            //这个值是接下来多久的时间内该Broker将不参与消息发送队列负载
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

在获取duration时,使用了如下的策略

private long computeNotAvailableDuration(final long currentLatency) {
  for (int i = latencyMax.length - 1; i >= 0; i--) {
    if (currentLatency >= latencyMax[i])
      return this.notAvailableDuration[i];
  }

  return 0;
}
public class MQFaultStrategy {
  private final static InternalLogger log = ClientLogger.getLog();
  private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

  private boolean sendLatencyFaultEnable = false;

     /*
        根据currentLatency本次消息发送延迟,从latencyMax尾部向前找到第一个比currentLatency小的索引index,如果没有找到,返回0。
        然后根据这个索引从notAvailableDuration数组中取出对应的时间,在这个时长内,Broker将设置为不可用。
     */
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
}

最后,维护faultItemTable,给faultItem中设置StartTimestamp

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
  FaultItem old = this.faultItemTable.get(name);
  if (null == old) {
    final FaultItem faultItem = new FaultItem(name);
    faultItem.setCurrentLatency(currentLatency);
    faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

    //这里也是双重判断,注意学习这种
    old = this.faultItemTable.putIfAbsent(name, faultItem);
    if (old != null) {
      old.setCurrentLatency(currentLatency);
      old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
  } else {
    old.setCurrentLatency(currentLatency);
    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  }
}

然后,根据上面设置好StartTimestamp,在选择队列的时候,判断isAvailable来规避故障的broker

int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  if (pos < 0)
    pos = 0;
  MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  //如果broker已经可用,则返回这个broker下的队列
  if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
    //这里貌似是个bug,应该是不等于把
    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) //????
      return mq;
  }
}

4. 消息发送

4.1 同步发送

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

这个方法是消息发送的核心方法

  1. 获取broker网络地址

    //尝试从MQClientInstance中寻找broker地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
      ///如果缓存没有,从nameserver更新信息,再次寻找
      tryToFindTopicPublishInfo(mq.getTopic());
      brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    
  2. 对消息进行处理

    • 如果是单体消息,则设置消息唯一全局id

    • 如果消息大小超过4K,则进行压缩,并加入系统标记MessageSysFlag.COMPRESSED_FLAG

       sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
      
    • 如果是事务消息,加入系统标记MessageSysFlag.TRANSACTION_PREPARED_TYPE

    • 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑

      • 事务消息 MessageType.Trans_Msg_Half,通过设置消息属性TRAN_MSG
      • 延迟消息 MessageType.Delay_Msg,通过设置消息属性__STARTDELIVERTIME或者DELAY
  3. 构建发送请求包

    这里需要注意对重试消息的处理

    public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
    
    
    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
      String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
      if (reconsumeTimes != null) {
        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
      }
    
      String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
      if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
      }
    }
    
  4. 发送请求

    如果是同步发送,会等待broker返回响应,如果失败会进行重试

  5. 如果注册了钩子函数,会执行after逻辑

    就算消息发送过程中发生RemotingException、MQBrokerException、InterruptedException时该方法也会执行

4.2 异步发送

  1. 消息异步发送是指消息生产者调用发送的API后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。
  2. 异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsync Semaphore Value来控制,默认为65535。
  3. 异步消息发送虽然也可以通过DefaultMQProducer#retryTimes-WhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。(同步发送在这些情况下也会重试)

4.4 单向发送

​ 单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。

5. 消息重试

以同步发送为例,当本次发送抛出异常时,会进行消息重试,默认重试2次

加上第一次的次数,消息默认情况下会发送3次


标题:03-生产者Producer
作者:mahaonan
地址:https://mahaonan.fun/articles/2022/07/18/1658147021559.html