5.3.3 尝试完成延迟的加入操作

2021年4月21日 23点热度 0条评论 来源: water___Wang

5.3.3 尝试完成延迟的加入操作

协调者在创建完延迟操作对象之后,为了检查能否完成刚刚创建的延迟操作,会调用延迟缓存的tryCompleteElseWatch()方法立即尝试完成。延迟缓存会调用延迟操作的tryComplete()方法,对于加入组的延迟缓存,就是调用延迟加入对象的tryCompleteJoi.n()方法。这个方法的第二个参数表示如果可以完成,就会强制完成延迟加入对象,RP最终会调用到延迟加入对象的onCompleteJoi.n()方法。延迟加入操作对象的tryComplete()方法和onComplete()方法,它们的具体实现是调用协调者的tryCompleteJoi.n()方法和onCompleteJoin()方法。相关代码如下:

延迟操作能否完成的判断条件是:消费组元数据的notYetRejoi.nedMembers()方法返回值是否为空。这个方法收集的是消费组中awai.ti.ngJoi.nCallback值对象为空的消费者成员元数据。因为协调者一旦开始处理消费者发送的“加入组请求”,就会设置awai.ti.『1gJoi.nCallback值对象为“发送响应的回调方法”,所以如果消费者发送了“加入组请求”,并且也被协调者开始处理,就不会被notYetRejoi.nedMembers()方法选出来。

以协调者处理第一个消费者发迭的加入组请求为例,因为第一个消费者的awai.ti.ngJoi.nCallback值对象为空,所以notYetRejoi.nedMembers()方法不会选择第一个消费者。那么,这个方法因为没有收集到任何一个满足条件的消费者,返回值为空,就会执行forceCo叩lete()方法,并调用延迟操作的onCompleteJoi.n()方法,开始返回“加入组响应”给消费者。

这和我们前面认为的“协调者会等待所有的消费者都发送了加入组请求后,才会认为请求处理完成”看起来有点矛盾。协调者在处理第一个消费者的加入组请求,没等到其他消费者发送加入组请求,就已经开始返回加入组响应结果给第一个消费者了。

实际上,协调者实现消费组的再平衡操作,是通过让消费者重新发送“加入组请求”的方式来完成的。如图5-6所示,第一个消费者发送“加入组请求”给协调者,具体的处理步骤如下。

(1)协调者处理第一个消费者的“加入组请求”,会创建消费者成员元数据,并加入消费组元数据。
(2)消费组初始为“稳定”状态,开始再平衡操作,将状态改为“准备再平衡”。
(3)创建一个“延迟的加入组”对象,并立即通过“延迟缓存”尝试完成刚创建的延迟操作。
(4)由于消费组中所有消费者成员(目前只有第一个消费者)的值对象不为空,notYetRejoinedMeMbers()
方法没有收集到任何元素,返回值为空,满足“完成延迟操作”的条件。
(5)因为可以完成延迟的操作,所以强制完成方法会调用延迟操作对象的onCo叩leteJoin()方法。
(6)延迟操作对象在完成时的回调方法,会首先将消费组状态更新为“等待同步”。
(7)返回“加入组响应”给所有的消费者(这里还是只有第一个消费者)。

注意:消费者成员元数据的回调方法先后经过了:设直、查询、获取、重直四个步骤,这跟Web开发中常见的CRUD很相似。但注意,这里的重直类似于逻辑删除,不是物理删除。消费者成员元数据在正常情况下,一旦加入到消费组的元数据中,除非超时被迫删除,否则是不九许直接删除的。

在步骤(I)协调者开始处理消费者的“加入组请求”时,就已经对消费组元数据进行川锁同步,以防止一次处理多个消费者的“加入组请求”。步骤(4)和步骤(7)在获取消费组元数据的所有消费者成员时,只会获取到第一个消费者。因为协调者在处理第一个消费者的加入组请求时,不会处理其他消费者的加入组请求。

当协调者处理完第一个消费者的加入组请求,并释放掉消费组元数据的锁保护后,消费者已经收到了“加入组响应”,消费组的状态已经接连从“稳定”状态转变为“准备再平衡”,又更新为“等待同步”。第一个消费者在收到“加入组响应”后,会执行分区分配了作。因为第一个消费者同时也是主消费者,在它执行完分区分配工作,将消费组的分配结果作为“同步组请求”发送给协调者后,也会收到“同步组响应”,即分配给第一个消费者的分区(因为消费组只有一个消费者,所以所有分区都会分配给第一个消费者,消费组的分配结果实际上就是分配给第一个消费者的所有分区)。

如图5-7所示,一旦协调者释放了“消费组元数据”的锁保护后,比如协调者返回“如|人组响应”给第-个消费者之后,如果第一个消费者还没有发送“同步组请求”给协调者,协调者就可以接着处理其他消费者发送的“加入组请求”。但如果在其他消费者发送“加入组请求”之前,第一个消费者很快地发送“同步组请求”给协调者,而协调者处理“同步组请求”和“加入组请求”一样,都会对“消费组元数据”进行加锁,这时协调者就不会处理其他消费者发送的“加入组请求”了。

图5-7中虚线框表示被“消费组元数据”加锁保护,协调者处理第一个消费者的“加入组请求”,一直到返回“川人组响应”给消费者,都是在同一个锁中完成的。但协调者从处理消费者的“同步组请求”,到返回“同步组响应”给消费者的过程,则不在同一个锁中。在返回“同步组响应”给消费者之前,还要保非消费组的分配结果到内部主题中,这一步因为没有操作“消费组元数据”,所以并不需要同步,会释放掉同步的锁。

注意:那么问题是:判断是否需妥加锁的依据是什么?答案是:有更新或查询消费组元数据就需妥加锁。比如,在查询消费组元数据的所有消费者元数据时,如果没有对查询进行加锁,可能会有其他线程更新了消费者元数据,从而会导致查询出来的数据不一致、线程不安全。

协调者在返回“加入组响应”给第一个消费者之后释放了同步的锁,消费组的状态也更新为“等待同步”。并且第一个消费者同时也是主消费者,它在收到“加入响应”后,会立即执行分区分配丁工作。如|枣阳-8(上)所示,协调者在处理第一个消费者的“同步组请求”时,不会执行其他消费者的“加入组请求”。如佟15-8(γ)所示,在第一个消费者发送“同步组请求”给协调者之前,新的消费者发送了“加入组请求“,协调者就会处理新消费者的”加入组请求“,因为现在没有其他线程对消费组元数据进行加锁保护了。

    原文作者:water___Wang
    原文地址: https://blog.csdn.net/WANTAWAY314/article/details/115865547
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。