RocketMQ事务消息解析!

RocketMQ事务消息解析!

技术博客 admin 144 浏览

相关文章:

文章首发到公众号:月伴飞鱼

文章内容收录到个人网站,方便阅读:hardyfish.top/

文章内容收录到个人网站,方便阅读:hardyfish.top/

觉得有收获,希望帮忙点赞,转发下哈,谢谢,谢谢!

单体架构下的事务

在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。

以创建订单为例,假设下单后需要做两个操作:

在订单表生成订单。

在积分表增加本次订单增加的积分记录。

在单体架构下只需使用@Transactional开启事务,就可以保证数据的一致性。

java
代码解读
复制代码
  @Transactional   public void order() {       String orderId = UUID.randomUUID().toString();       // 生成订单       orderService.createOrder(orderId);       // 增加积分       creditService.addCredits(orderId);   }

但在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中。

  • 在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚。

所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。

分布式架构下的事务

以下单流程为例,在分布式架构下的处理流程如下:

订单服务生成订单。

发送订单生成的MQ消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录。

普通MQ消息存在的问题

假如订单创建成功,MQ消息发送成功,但是order方法在返回的前一刻,服务突然宕机。

由于开启了事务,事务还未提交(方法结束后才会正常提交)。

所以订单表并未生成记录,但是MQ却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况。

假如先发送MQ消息再创建订单,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态。

java
代码解读
复制代码
  @Transactional   public void order() {       String orderId = UUID.randomUUID().toString();       // 创建订单       Order order = orderService.createOrder(orderDTO.getOrderId());       // 发送订单创建的MQ消息       sendOrderMessge(order);       return;   }

可以使用RocketMQ事务消息解决上述问题。

RocketMQ事务消息基础流程

Apache RocketMQ4.3.0版中已经支持分布式事务消息。

事务消息是 RocketMQ 提供的一种消息类型,支持在分布式场景下保障消息生产本地事务的最终一致性。

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。

基本流程

第一阶段

  • 发送 Message,Half Message,即半事务消息
  • 此类型的 Message 是不会被 Consumer 消费。

第二阶段:如果半事务消息投递成功,则会开始执行本地事务。

分为如下三种 Case

  • 本地事务执行成功:

    • 会向 Broker 发送 commit 消息,被 commit 过后的 Message 才能被 Consumer 消费到。
  • 本地事务执行失败

    • 会向 Broker 发送 rollback 消息,Broker 则会将刚刚投递的半事务消息删除,从而保证上下游数据的一致性。
  • 如果 Producer 实例或者网络出现了问题,Producer 没能及时地将本地事务执行的结果通知 Broker

  • Broker 会通过扫描发现某条 Message 长时间处于半事务消息状态。

    • Broker 会主动地向 Producer 询问此 Message 对应的事务状态。

值得注意的是:

RocketMQ 并不会无休止的的信息事务状态回查,默认回查 15 次。

如果 15 次回查还是无法得知事务状态,RocketMQ 默认回滚该消息。

RocketMQ事务消息使用限制

事务消息不支持延时消息和批量消息。

事务性消息可能不止一次被检查或消费,所以消费者端需要做好消费幂等

事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。

  • 与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

RocketMQ事务消息基本原理

采用2PC两阶段设计。

Message 原本真实的 TopicMessageQueue 进行备份。

  • 放入到PROPERTY_REAL_TOPIC PROPERTY_REAL_QUEUE_ID中保存。

将消息投递到一个内部TopicRMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事务消息。

所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue

因为一个 Topic 下只有 1 个 MessageQueue

  • 这个 Topic 下的所有 Message 就是全局有序的,它们会按照先来后到的顺序被消费。

如果本地事务执行成功进行Commit,则将RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的Topic中,供后续流程执行。

  • 并删除这条 Half Message ,但删除也是假删除,只是给 Message 打上一个删除的 Tag

如果本地事务执行失败进行rollback,则直接删除这条 Half Message ,但删除也是假删除。

如果本地事务迟迟没有返回结果 (默认时间是6s),则会触发事务回查机制

  • 执行回查之前需要校验检查次数是否到达了最大值(需要手动设置,没有默认值)。
  • 或者是当前 Half Message 存在是否超过了 Message 保存的上限,即 3天。
  • 如果满足上面条件中的一种Half Message 会被放进 TRANS_CHECK_MAX_TIME_TOPIC Topic 当中。
  • 一旦判定为需要执行事务回查逻辑,那么当前这条 Half Message 就算已经被消费了。
  • 在没达到最大的校验次数之前,都还需要将其投递到事务队列当中,以便下次重试时再次执行 Check 逻辑。
  • 如果回查成功则删除投递的 Half Message

源码解读

发送事务消息调用的是TransactionMQProducersendMessageInTransaction方法:

主要有以下几个步骤:

获取事务监听器TransactionListener,如果获取为空或者本地事务执行器LocalTransactionExecuter为空将抛出异常。

因为需要通过TransactionListener或者LocalTransactionExecuter来执行本地事务,所以不能为空。

在消息中设置prepared属性,此时与普通消息(非事务消息)相比多了PROPERTY_TRANSACTION_PREPARED属性。

调用send方法发送prepared消息也就是half消息,发送消息的流程与普通消息一致。

根据消息的发送结果判断:

  • 如果发送成功执行本地事务,并返回本地事务执行结果状态,如果返回的执行状态结果为空,将本地事务状态设置为UNKNOW
  • 发送成功之外的其他情况,包括FLUSH_DISK_TIMEOUT刷盘超时、FLUSH_SLAVE_TIMEOUTSLAVE_NOT_AVAILABLE从节点不可用三种情况。
  • 此时意味着half消息发送失败,本地事务状态置为ROLLBACK_MESSAGE回滚消息。

调用endTransaction方法结束事务。

参考

《RocketMQ技术内幕》

github.com/apache/rock…

github.com/apache/rock…

源文:RocketMQ事务消息解析!

如有侵权请联系站点删除!

技术合作服务热线,欢迎来电咨询!