顺序消息在开发中有发挥着重要的的作用,尤其是在涉及多个步骤的业务流程中,是保证系统稳定、数据一致和业务逻辑正确性的重要手段。

为什么需要顺序消息?

顺序消息确保系统的执行按照预定的流程进行,是保证系统稳定、数据一致和业务逻辑正确性的重要手段。如下图:

/

比如生产者是 Msg1,Msg2, Msg3,Msg4,Msg5 的顺序发出,消息的接受者也要按这个顺序进行消费,系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。以一个实际的业务场景为例:

在审批系统中,通常有多个审批节点,像是初审、复审、终审等。每一层的审批都依赖于上一层的审批结果。例如,只有初审通过,复审才会进行;复审通过后,才能进入终审。每一层的审批结果是后续流程的输入,所有的审批必须按照正确的顺序来进行。并且系统采用事件驱动架构,每次操作都链接一个事件消息总线,操作完成后发送消息,事件处理模块收到消息后对各种消息进行处理。这样的场景就十分依赖事件消息的顺序性,如果复审的消息比初审更先到达,这就出现业务不一致的情况。

RocketMQ 顺序消息

RocketMQ 里的分区队列 MessageQueue 本身是能保证 FIFO 的,RocketMQ Topic 架构如下(图源 Rocket MQ 官方文档):

正常情况下不能顺序消费消息主要有两个原因:

  1. Producer 发送消息到 MessageQueue 时是轮询发送的,消息被发送到不同的分区队列,就不能保证 FIFO了。
  2. Consumer 默认是多线程并发消费同一个 MessageQueue 的,即使消息是顺序到达的,也不能保证消息顺序消费。

设置队列数量

开发版 RocketMQ 设置队列大小:

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producer_A");
        // NameServer
        producer.setNamesrvAddr("localhost:9876");
        // 指定异步发送失败后不进行重试发送
        producer.setRetryTimesWhenSendAsyncFailed(0);
        // 指定新创建的Topic的Queue数量为2,默认为4
        producer.setDefaultTopicQueueNums(2);
        // 开启生产者
        producer.start();
        // 模拟发送100条消息
        for (int i = 0; i < 100; i++) {
            byte[] bytes = ("hi " + i).getBytes();
            try {
                Message msg = new Message("topic_A", "tag_A", bytes);
                // 异步发送,指定回调
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // 异步发送,如果不执行sleep,则消息在发送之前,producer已经关闭,就会报错
        TimeUnit.SECONDS.sleep(3);
        // 关闭生产者
        producer.shutdown();
    }
}

商用版 RocketMQ 设置队列大小(火山引擎文档):

消息发向 RocketMQ 时,发向的是 Broker 內的多个 Queue,消息消费时,会根据特定算法在某一个 Queue 中消费,所以在消费时,无法保证顺序性,只能在单 Queue 中有序,也就是“局部有序”,想要做到全局有序,只能将这个 Topic 的 Queue 队列数为 1,“局部”变全局。

在生产环境中,消息队列的队列数一般是由运维控制,不建议频繁修改。

RocketMQ 架构借鉴自 Kafka,RocketMQ 队列模型和Kafka的分区(Partition)模型类似,所以 Kafka实现顺序消息也跟 RocketMQ 类似。

加锁

RocketMQ 的 Queue 设计之初是为了提高并发度而设计,如果一个 Topic 只有一个队列,无疑对消息队列的高并发是一个极大的削弱,那么有没有其他方法实现顺序消息,使用多个队列且能够并发消费呢?另一种方式有一种方式就是加锁。Rocket MQ就是这个做的,在Consumer 加本地锁,在 Broker 加全局锁,实现顺序消费。加锁代码如下(Github):

public boolean lock(final MessageQueue mq) {
  FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, true);
  if (findBrokerResult != null) {
    LockBatchRequestBody requestBody = new LockBatchRequestBody();
    requestBody.setConsumerGroup(this.consumerGroup);
    requestBody.setClientId(this.mQClientFactory.getClientId());
    requestBody.getMqSet().add(mq);

    try {
      Set<MessageQueue> lockedMq =
        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
      for (MessageQueue mmqq : lockedMq) {
        ProcessQueue processQueue = this.processQueueTable.get(mmqq);
        if (processQueue != null) {
          processQueue.setLocked(true);
          processQueue.setLastLockTimestamp(System.currentTimeMillis());
        }
      }

      boolean lockOK = lockedMq.contains(mq);
      log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
      return lockOK;
    } catch (Exception e) {
      log.error("lockBatchMQ exception, " + mq, e);
    }
  }

  return false;
}
public void consumeMessageDirectly(final List<MessageExt> msgs, final String consumerGroup,
                                   final ConsumeOrderlyContext context) {
    if (msgs == null || msgs.isEmpty()) {
        return;
    }

    final MessageListenerOrderly listener = this.consumeOrderlyDispatch.get(consumerGroup);
    if (listener == null) {
        log.warn("consumeMessageDirectly: {}, the consumer group: {} not exist!", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), consumerGroup);
        return;
    }

    for (MessageExt msg : msgs) {
        try {
            // 调用消费者处理消息
            final ConsumeOrderlyStatus status = listener.consumeMessage(Collections.singletonList(msg), context);
            if (null == status) {
                throw new MQClientException("Return value of the listener cannot be null", null);
            }

            switch (status) {
                case SUCCESS:
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    // 暂停当前队列一段时间,以便其他队列的消息得到消费
                    context.setSuspendCurrentQueueTimeMillis(listener.suspendCurrentQueueTimeMillis());
                    break;
                default:
                    throw new MQClientException("Illegal consume orderly status", null);
            }
        } catch (Throwable e) {
            log.warn("consumeMessageDirectly exception", e);
        }
    }
}

Redis 顺序消息

单 Queue 消费与加锁也会影响性能,有没有其他方法实现顺序消息呢?在 MQ 领域目前没有好方法,现在流传一种轻量级实现方案——Redis 消息队列,搜索结果如下:

/

Redis 确实可以实现轻量级的消息队列,但是这种实现,会有两个缺点:

  1. Redis 本身定位是内存数据库,它的设计之初都是为缓存准备的,并不具备消息堆积的能力。而专业消息队列一个非常重要的功能是数据中转枢纽,Redis 的定位很难满足,所以使用起来要非常小心。
  2. Redis 的高可用方案可能丢失消息(AOF 持久化 和 主从复制都是异步 ),而专业消息队列可以针对不同的场景选择不同的高可用策略。

Redis 的并发与持久化

的确,Redis 无法取代那些老牌消息队列,但是他有一个核心优势——Redis 的单线程命令执行,天然保证了 Redis 的并发安全。

Redis 的持久化依靠 RDB 与 AOF 两种机制,RDB 会在指定的时间间隔内创建数据的快照(dump)。AOF 通过将写命令追加到日志文件中来持久化数据。RDB + AOF 可以保证 Redis 数据不丢失,AOF 配置 appendfsync 参数用于设置“真正执行”操作命令向 AOF 文件中同步的策略。

# 同步策略
# appendfsync always
appendfsync everysec
# appendfsync no

使用同步策略可以避免一些操作系统,机房故障,但是遇到磁盘损坏时,就需要使用Redis的高可用方案了,这又是另一个话题。

使用 Redis 实现顺序消息,是不是一种好的选择?

Redis 的 Stream

Redis List 数据结构用做队列时,因为消费时没有 Ack 机制,应用异常挂掉导致消息偶发丢失的情况,Redis Stream 从设计角度来讲已经完美的解决了。Redis 的 Stream 是在 5.0 版本先加入的数据结构,其工作模式如下:

每个 Redis Streamt 都有唯一的名称 ,对应唯一的 Redis Key 。

  • 同一个 Stream 可以挂载多个消费组 ConsumerGroup , 消费组不能自动创建,需要使用 XGROUP CREATE 命令创建。Der

  • 每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动 ,标识当前消费组消费到哪条消息了。

  • 消费组 ConsumerGroup 同样可以挂载多个消费者 Consumer , 每个 Consumer 并行的读取消息,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。

  • 消费者内部有一个属性 pending_ids , 记录了当前消费者读取但没有回复 ACK 的消息 ID 列表 。当消费者重新上线,这些消息可以重新被消费。

Redis 的 Stream 这种数据结构,十分适合做消息队列。在 Redis 中 Stream 类似于 MQ 的 Topic ,每个 Stream 只有一个队列,也就没有 MQ 中的多队列并发消费问题。接下来实现消息队列的核心功能

创建 Stream 组

@PostConstruct
public void createConsumerGroup() {
  log.info("createConsumerGroup...");
  try {
    // 如果消费组不存在,则创建
    stringRedisTemplate.opsForStream().createGroup(STREAM_NAME, GROUP_NAME);
  } catch (Exception e) {
    // 如果消费组已存在,忽略错误
    if (!e.getMessage().contains("BUSYGROUP")) {
      throw e;
    }
  }
}

Stream 消息发送

@GetMapping("/stream")
public ResponseEntity<User> stream(String key, String value) {
  Map<String, String> map = new HashMap<>();
  map.put(key, value);
  Record<String, Map<String, String>> record = StreamRecords.newRecord().ofMap(map).withStreamKey("CommonKey").withId(RecordId.autoGenerate());
  RecordId recordId = stringRedisTemplate.opsForStream()
    .add(record);
  return null;
}

Stream 消息监听

实现 StreamListener 接口的 onMessage 方法,这个方法会执行监听消息后的处理逻辑。

class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
  @Override
  public void onMessage(MapRecord<String, String, String> entries) {
    RecordId id = onProcess(entries);
    ack(id.getValue());
  }
}

将 Subscription 注入到 Spring:

@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {

  StreamOperations<String, Object, Object> streamOps = stringRedisTemplate.opsForStream();

  //监听容器配置
  StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer
    .StreamMessageListenerContainerOptions
    .builder()
    .pollTimeout(Duration.ofSeconds(1))
    .build();
  //监听器实现
  MyStreamListener streamListener = new MyStreamListener();
  //创建监听容器
  StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
  //groupName需要提前创建
  Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(GROUP_NAME, CONSUMER_NAME),
                                                               StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed()),
                                                               streamListener);
  listenerContainer.start();
  log.info("--------------------stream 监听启动------------------------------");
  return subscription;
}

Stream 消息处理

private static RecordId onProcess(MapRecord entries) {
  log.info("MapRecord = {}", entries);
  return entries.getId();
}

Stream 消息 ACK

/**
* ACK 消息
*/
private void ack(String id) {
  stringRedisTemplate.opsForStream().acknowledge(STREAM_NAME, GROUP_NAME, id);
}

Stream 消费异常消息

当有些消息因为某些原因没有 ACK 时,会将消息存在 Pendding 里,需要重新获取再消费。

public void processPendingMessages(StreamOperations<String, Object, Object> streamOps) {
  log.info("processPendingMessages...");

  PendingMessages pendingMessages = streamOps.pending(
    STREAM_NAME, Consumer.from(GROUP_NAME, CONSUMER_NAME));

  Iterator<PendingMessage> iterator = pendingMessages.stream().iterator();

  while (iterator.hasNext()) {
    PendingMessage pendingMessage = iterator.next();
    String messageId = pendingMessage.getId().getValue();
    String consumer = pendingMessage.getConsumer().getName();
    if (!consumer.equals(CONSUMER_NAME)) {
      log.info("PendingMessage consumer: {}", consumer);
      continue;
    }
    // 处理每个 Pending 消息
    List<MapRecord<String, Object, Object>> records = stringRedisTemplate.opsForStream().range(
      STREAM_NAME, Range.just(messageId)
    );
    if (!records.isEmpty()) {
      MapRecord<String, Object, Object> record = records.get(0);
      onProcess(record);
    }
    ack(messageId);
  }
}

Stream 消费离线消息

在消息生产时,且消费者没上线时,消息会存在 Redis 中 不会及时消费,所以需要处理在消费者离线时生产的消息。

private void processReadMessages(StreamOperations<String, Object, Object> streamOps) {
  log.info("processReadMessages...");
  List<MapRecord<String, Object, Object>> read = streamOps.read(
    Consumer.from(GROUP_NAME, CONSUMER_NAME),
    StreamReadOptions.empty().block(Duration.ofSeconds(3)), // 阻塞 3 秒
    StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed()) // 从上次消费位置开始
  );
  for (MapRecord<String, Object, Object> entries : read) {
    onProcess(entries);
  }
}

参考

  1. RocketMQ 顺序消息:https://rocketmq.apache.org/zh/docs/featureBehavior/03fifomessage/
  2. RocketMQ 顺序消息:https://juejin.cn/post/7017973936654647303
  3. RocketMQ 实战笔记:https://blog.csdn.net/agonie201218/article/details/125372702
  4. 详解rocketMQ顺序消息:https://zhuanlan.zhihu.com/p/608612130
  5. RocketMQ的顺序消费问题:https://blog.csdn.net/qq_40662424/article/details/122977388
  6. Kafka 顺序消息:https://cloud.tencent.com/developer/article/1839597
  7. 聊聊 Redis Stream :https://www.cnblogs.com/makemylife/p/18135130
  8. 消息队列 RocketMQ版:https://www.volcengine.com/docs/6410/131277
  9. Redis进阶 - 持久化:RDB和AOF机制详解:https://pdai.tech/md/db/nosql-redis/db-redis-x-rdb-aof.html
  10. Redis能保证数据不丢失吗:https://www.cnblogs.com/bossma/p/18029874