侧边栏壁纸
  • 累计撰写 793 篇文章
  • 累计创建 1 个标签
  • 累计收到 1 条评论
标签搜索

目 录CONTENT

文章目录

消息队列

Dettan
2021-07-10 / 0 评论 / 0 点赞 / 159 阅读 / 4,315 字
温馨提示:
本文最后更新于 2022-07-23,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。
本地消息表
这种实现方式的思路,其实是源于 ebay,后来通过支付宝等公司的布道,在业内广泛使用。其基本的设计思想是将远程分布式事务拆分成一系列的本地事务。如果不考虑性能及设计优雅,借助关系型数据库中的表即可实现。
举个经典的跨行转账的例子来描述。
第一步伪代码如下,扣款 1W,通过本地事务保证了凭证消息插入到消息表中。
第二步,通知对方银行账户上加 1W 了。那问题来了,如何通知到对方呢?
通常采用两种方式:
1.
采用时效性高的 MQ,由对方订阅消息并监听,有消息时自动触发事件
2.
采用定时轮询扫描的方式,去检查消息表的数据。
两种方式其实各有利弊,仅仅依靠 MQ,可能会出现通知失败的问题。而过于频繁的定时轮询,效率也不是最佳的(90% 是无用功)。所以,我们一般会把两种方式结合起来使用。
解决了通知的问题,又有新的问题了。万一这消息有重复被消费,往用户帐号上多加了钱,那岂不是后果很严重?
仔细思考,其实我们可以消息消费方,也通过一个“消费状态表”来记录消费状态。在执行“加款”操作之前,检测下该消息(提供标识)是否已经消费过,消费完成后,通过本地事务控制来更新这个“消费状态表”。这样子就避免重复消费的问题。
总结: 上诉的方式是一种非常经典的实现,基本避免了分布式事务,实现了“最终一致性”。但是,关系型数据库的吞吐量和性能方面存在瓶颈,频繁的读写消息会给数据库造成压力。所以,在真正的高并发场景下,该方案也会有瓶颈和限制的。
MQ(非事务消息)
通常情况下,在使用非事务消息支持的 MQ 产品时,我们很难将业务操作与对 MQ 的操作放在一个本地事务域中管理。通俗点描述,还是以上述提到的“跨行转账”为例,我们很难保证在扣款完成之后对 MQ 投递消息的操作就一定能成功。这样一致性似乎很难保证。
先从消息生产者这端来分析,请看伪代码:
根据上述代码及注释,我们来分析下可能的情况:
1.
操作数据库成功,向 MQ 中投递消息也成功,皆大欢喜
2.
操作数据库失败,不会向 MQ 中投递消息了
3.
操作数据库成功,但是向 MQ 中投递消息时失败,向外抛出了异常,刚刚执行的更新数据库的操作将被回滚
从上面分析的几种情况来看,貌似问题都不大的。那么我们来分析下消费者端面临的问题:
1.
消息出列后,消费者对应的业务操作要执行成功。如果业务执行失败,消息不能失效或者丢失。需要保证消息与业务操作一致
2.
尽量避免消息重复消费。如果重复消费,也不能因此影响业务结果
如何保证消息与业务操作一致,不丢失?
主流的 MQ 产品都具有持久化消息的功能。如果消费者宕机或者消费失败,都可以执行重试机制的(有些 MQ 可以自定义重试次数)。
如何避免消息被重复消费造成的问题?
1.
保证消费者调用业务的服务接口的幂等性
2.
通过消费日志或者类似状态表来记录消费状态,便于判断(建议在业务上自行实现,而不依赖 MQ 产品提供该特性)
总结: 这种方式比较常见,性能和吞吐量是优于使用关系型数据库消息表的方案。如果 MQ 自身和业务都具有高可用性,理论上是可以满足大部分的业务场景的。不过在没有充分测试的情况下,不建议在交易业务中直接使用。
MQ(事务消息)
举个例子,Bob 向 Smith 转账,那我们到底是先发送消息,还是先执行扣款操作?
好像都可能会出问题。如果先发消息,扣款操作失败,那么 Smith 的账户里面会多出一笔钱。反过来,如果先执行扣款操作,后发送消息,那有可能扣款成功了但是消息没发出去,Smith 收不到钱。除了上面介绍的通过异常捕获和回滚的方式外,还有没有其他的思路呢?
下面以阿里巴巴的 RocketMQ 中间件为例,分析下其设计和实现思路。
RocketMQ 第一阶段发送 Prepared 消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的读者可能又发现问题了,如果确认消息发送失败了怎么办?
RocketMQ 会定期扫描消息集群中的事物消息,这时候发现了 Prepared 消息,它会向消息发送者确认,Bob 的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ 会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。如下图:
总结: 据笔者的了解,各大知名的电商平台和互联网公司,几乎都是采用类似的设计思路来实现“最终一致性”的。这种方式适合的业务场景广泛,而且比较可靠。不过这种方式技术实现的难度比较大。目前主流的开源 MQ(ActiveMQ、RabbitMQ、Kafka)均未实现对事务消息的支持,所以需二次开发或者新造轮子。比较遗憾的是,RocketMQ 事务消息部分的代码也并未开源,需要自己去实现。
其他补偿方式
做过支付宝交易接口的同学都知道,我们一般会在支付宝的回调页面和接口里,解密参数,然后调用系统中更新交易状态相关的服务,将订单更新为付款成功。同时,只有当我们回调页面中输出了 success 字样或者标识业务处理成功相应状态码时,支付宝才会停止回调请求。否则,支付宝会每间隔一段时间后,再向客户方发起回调请求,直到输出成功标识为止。
其实这就是一个很典型的补偿例子,跟一些 MQ 重试补偿机制很类似。
一般成熟的系统中,对于级别较高的服务和接口,整体的可用性通常都会很高。如果有些业务由于瞬时的网络故障或调用超时等问题,那么这种重试机制其实是非常有效的。
当然,考虑个比较极端的场景,假如系统自身有 bug 或者程序逻辑有问题,那么重试 1W 次那也是无济于事的。那岂不是就发生了“明明已经付款,却显示未付款不发货”类似的悲剧?
其实为了交易系统更可靠,我们一般会在类似交易这种高级别的服务代码中,加入详细日志记录的,一旦系统内部引发类似致命异常,会有邮件通知。同时,后台会有定时任务扫描和分析此类日志,检查出这种特殊的情况,会尝试通过程序来补偿并邮件通知相关人员。
在某些特殊的情况下,还会有“人工补偿”的,这也是最后一道屏障。






2.6. 基于消息中间件的最终一致性事务方案
无论是 2PC & 3PC 还是 TCC、事务状态表,基本都遵守 XA 协议的思想。即这些方案本质上都是事务协调者协调各个事务参与者的本地事务的进度,使所有本地事务共同提交或回滚,最终达成一种全局的 ACID 特性。在协调的过程中,协调者需要收集各个本地事务的当前状态,并根据这些状态发出下一阶段的操作指令。
但是这些全局事务方案由于操作繁琐、时间跨度大,或者在全局事务期间会排他地锁住相关资源,使得整个分布式系统的全局事务的并发度不会太高。这很难满足电商等高并发场景对事务吞吐量的要求,因此互联网服务提供商探索出了很多与 XA 协议背道而驰的分布式事务解决方案。其中利用消息中间件实现的最终一致性全局事务就是一个经典方案。

为了表现出这种方案的精髓,我将使用如下的电商系统微服务结构来进行描述:
在这个模型中,用户不再是请求整合后的 shopping-service 进行下单,而是直接请求 order-service 下单,order-service 一方面添加订单记录,另一方面会调用 repo-service 扣减库存。
这种基于消息中间件的最终一致性事务方案常常被误解成如下的实现方式:
这种实现方式的流程是:
1.
order-service 负责向 MQ server 发送扣减库存消息(repo_deduction_msg);repo-service 订阅 MQ server 中的扣减库存消息,负责消费消息。
2.
用户下单后,order-service 先执行插入订单记录的查询语句,后将 repo_deduction_msg 发到消息中间件中,这两个过程放在一个本地事务中进行,一旦“执行插入订单记录的查询语句”失败,导致事务回滚,“将 repo_deduction_msg 发到消息中间件中”就不会发生;同样,一旦“将 repo_deduction_msg 发到消息中间件中”失败,抛出异常,也会导致“执行插入订单记录的查询语句”操作回滚,最终什么也没有发生。
3.
repo-service 接收到 repo_deduction_msg 之后,先执行库存扣减查询语句,后向 MQ sever 反馈消息消费完成 ACK,这两个过程放在一个本地事务中进行,一旦“执行库存扣减查询语句”失败,导致事务回滚,“向 MQ sever 反馈消息消费完成 ACK”就不会发生,MQ server 在 Confirm 机制的驱动下会继续向 repo-service 推送该消息,直到整个事务成功提交;同样,一旦“向 MQ sever 反馈消息消费完成 ACK”失败,抛出异常,也对导致“执行库存扣减查询语句”操作回滚,MQ server 在 Confirm 机制的驱动下会继续向 repo-service 推送该消息,直到整个事务成功提交。
这种做法看似很可靠。但没有考虑到网络二将军问题的存在,有如下的缺陷:
网络的 2 将军问题 :上面第 2 步中 order-service 发送 repo_deduction_msg 消息失败,对于发送方 order-service 来说,可能是消息中间件没有收到消息;也可能是中间件收到了消息,但向发送方 order-service 响应的 ACK 由于网络故障没有被 order-service 收到。因此 order-service 贸然进行事务回滚,撤销“执行插入订单记录的查询语句”,是不对的,因为 repo-service 那边可能已经接收到 repo_deduction_msg 并成功进行了库存扣减,这样 order-service 和 repo-service 两方就产生了数据不一致问题。
数据库长事务问题 :repo-service 和 order-service 把网络调用(与 MQ server 通信)放在本地数据库事务里,可能会因为网络延迟产生数据库长事务,影响数据库本地事务的并发度。

以上是被误解的实现方式,下面给出正确的实现方式,如下所示:
上图所示的方案,利用消息中间件如 rabbitMQ 来实现分布式下单及库存扣减过程的最终一致性。对这幅图做以下说明:
1)order-service 中,
在 t_order 表添加订单记录 &&

在 t_local_msg 添加对应的扣减库存消息
这两个过程要在一个事务中完成,保证过程的原子性。同样,repo-service 中,
检查本次扣库存操作是否已经执行过 &&

执行扣减库存如果本次扣减操作没有执行过 &&

写判重表 &&

向 MQ sever 反馈消息消费完成 ACK
这四个过程也要在一个事务中完成,保证过程的原子性。
2)order-service 中有一个后台程序,源源不断地把消息表中的消息传送给消息中间件,成功后则删除消息表中对应的消息。如果失败了,也会不断尝试重传。由于存在网络 2 将军问题,即当 order-service 发送给消息中间件的消息网络超时时,这时候消息中间件可能收到了消息但响应 ACK 失败,也可能没收到,order-service 会再次发送该消息,直至消息中间件响应 ACK 成功,这样可能发生消息的重复发送,不过没关系,只要保证消息不丢失,不乱序就行,后面 repo-service 会做去重处理。
3)消息中间件向 repo-service 推送 repo_deduction_msg,repo-service 成功处理完成后会向中间件响应 ACK,消息中间件收到这个 ACK 才认为 repo-service 成功处理了这条消息,否则会重复推送该消息。但是有这样的情形:repo-service 成功处理了消息,向中间件发送的 ACK 在网络传输中由于网络故障丢失了,导致中间件没有收到 ACK 重新推送了该消息。这也要靠 repo-service 的消息去重特性来避免消息重复消费。
4)在 2)和 3)中提到了两种导致 repo-service 重复收到消息的原因,一是生产者重复生产,二是中间件重传。为了实现业务的幂等性,repo-service 中维护了一张判重表,这张表中记录了被成功处理的消息的 id。repo-service 每次接收到新的消息都先判断消息是否被成功处理过,若是的话不再重复处理。

通过这种设计,实现了消息在发送方不丢失,消息在接收方不被重复消费,联合起来就是消息不漏不重,严格实现了 order-service 和 repo-service 的两个数据库中数据的最终一致性。
基于消息中间件的最终一致性全局事务方案是互联网公司在高并发场景中探索出的一种创新型应用模式,利用 MQ 实现微服务之间的异步调用、解耦合和流量削峰,支持全局事务的高并发,并保证分布式数据记录的最终一致性。

0

评论区