6.3.1 延迟操作接口

2021年4月23日 40点热度 0条评论 来源: water___Wang

6.3 延迟操作

Kafka的服务端处理客户端的请求,针对不同的请求,可能不会立即返回响应结果给客户端。比如,生产者设置的应答值等于-I,服务端必须等待ISR所有副本都同步完消息,才会发送生产结果给生产者。消费者或备份副本设置的最小拉取大小等于l字节,服务端必须至少读取到l字节的消息,才会发送拉取结果给消费者或备份副本。

Kafka在处理这种类型的请求时,会将“延迟返回响应结果的请求”即“延迟操作”对象(DelayedOperation)放入“延迟缓存队列”(DelayedOperationPurgatory)。延迟的操作对象有两种方式可以从“延迟缓存队列”中完成,井从缓存队列中移除。

  • 延迟操作对应的外部事件发生时,外部事件会尝试完成延迟缓存中的延迟操作。
  • 如果外部事件仍然没有完成延迟操作,超时时间达到后,会强制完成延迟的操作。

如阁6-53所示,服务端处理客户端请求,返回响应结果给客户端有下面3种情况。

  • 读取或写入主哥~本的本地日志文件后,如果不需要延迟返回,立即返回。
  • 存在限制条件导致无法立即返回响应结果,创建延迟操作。一旦超时时间到了,必须返回。
  • 外部事件发生时,判断是杏可以解除限制条件。一旦满足条件,可以返回。

“延迟操作”对象被创建并加入“延迟缓存”后,外部事件是完成“延迟操作”的主要因素。“延迟生产”和“延迟拉取”两种操作分别对应不同的延迟缓存。尝试完成延迟的生产有下面两种时机:

  • 分区的最高水位增加了,尝试完成延迟的生产;
  • 服务端接收到一个备份副本的拉取请求,尝试完成延迟的生产。

尝试完成延迟的拉取有下面两种时机:

  • 分区的最高水位增加了,尝试完成消费者的延迟拉取;
  • 服务端接收到生产者的生产请求,追加消息集到主副本的本地日志,尝试完成备份副本的延迟拉取。

尝试完成这两种操作的相关代码如下:

“延迟缓存”数据结构(DelayedOperationPurgatory)类似Map,它的键是DelayedOperationKey,值是延迟操作对象(DelayedOperation)。生产者的生产请求、消费者和备份副本的拉取请求都会包含多个分区,延迟缓存的键和分区有关。第5章中“延迟加入缓存”、“延迟心跳缓存”的健和消费组编号、消费者成员编号有关。

6.3.1 延迟操作接-

Kafka服务端用DelayedOperation接- 表示延迟的操作对象,它的实现类有前面分析的延迟加入、延迟心跳、延迟生产、延迟拉取。延迟操作首先必须有一个超时时间(delayMs),延迟操作被创建并加入延迟缓存,表示服务端处理客户端的请求时,不能立即返回响应结果给客户端。当延迟操作加入到延迟缓存后,经过超时时间后仍然没有被完成,服务端就会强制发送响应结果给客户端。

在Java中要表示一个延迟一定时间执行的任务,可以使用具有延迟执行、周期性执行等功能的定时任务线程类(TimerTask)。延迟操作类继承的TimerTask也是一个线程类,它和IJava的TimerTask类似,不过Java的定时任务一旦设置了调度时间,就只能在指定的时间开始执行。而Kafka的延迟操作在外部事件触发下,如果可以完成,会提前完成延迟的操作。如果外部事件一直无法完成延迟的操作,在指定的超时时间后,会强制完成延迟的操作。

注意:“延迟的操作”表示延迟返回响应结果给客户栋。完成延迟的操作,表示可以返回响应结果给客户端。不能完成延迟的操作,表示暂时还不能返回响应结果给客户端。延迟操作接- 定义了下面几个与“完成”相关的方法,不过最终只会调用一次完成延迟的操作方法。

(l)尝试完成CtryComplete)。延迟操作相关的外部事件发生时会尝试完成延迟的操作。该方法返回值如果为true,表示可以完成延迟的操作,接下来会调用强制完成的方法(步骤(2))。如果返回值为false,表示还不能完成延迟的操作。这个方法的主要逻辑会根据不同的请求类型,判断是否可以返回响应结果给客户端。
(2)强制完成(forceComplete)。有两个地方会调用这个方法,一个是调用尝试完成的方法(步骤
(1)),返回值为true的时候;另一个是超时的时候。这个方法在接- 中的实现是调用完成的回调方法(步骤(4))。

(3)线程运行(run)。延迟操作超时的时候,会调用线程的运行方法。因为超时只会发生一次,所以线程的运行方法也只会调用一次。超时后,会先调用强制完成的方法(步骤(2)),如果返回值为true,则会继续调用超时的回调方法(步骤(5))。
(4)完成的回调方法ConComplete)。延迟操作的请求类型不用,它们的回调逻辑也不同。
(5)超时的回调方法ConExpiration)。和完成的回调方法类似,延迟操作的超时回调也不同。

延迟操作接- 的相关代码如下:

延迟操作对象有两种完成的方式:外部事件触发完成,或者超时完成。它们都会调用forceCo叩lete()方法,并调用。nCo叩lete()。因为onComplete()方法只会调用一次,所以forceComplete()也只会执行一次。在多钱程环境下,用原子变量(completed)来控制只有一个线程会调用到onComplete()方法。注意:延迟操作只会调用一次的方法有:onComplete()和run()。如果不考虑多线程,forceComplete()也只会调用一次。不过,tryComplete()方法由外部事件触发,而事件不止一个,并且一个事件触发后可能还不能完成延迟的操作,tryCo叩lete()有可能会被调用多次。

以延边的生产为例,服务端处理备份副本的拉取请求会尝试完成延迟的生产。假设分区的主副本是RI,ISR集合有[Rl,R2,R3]这3个副本。服务端将消息集写入主副本RI后,会创建一个延迟的生产请求,并第一次尝试完成延迟的生产,但不能完成。然后,备份副本R2发送拉取请求,服务揣处理胆的拉取请求,会第二次尝试完成延迟的生产,还不能完成。最后,备份副本R3发送拉取请求,服务端处理R3的拉取请求,会第三次尝试完成延迟的生产,可以完成。三次尝试完成操作都会调用tryComplete()方法,前两次返回false,最后一次返回true。最后一次还会调用一次forceComplete()、onComplete()方法。

下面会分别分析“延迟生产”和“延迟拉取”完成时的回调方法、尝试完成的延迟操作。

  1. 完成延迟的操作
    副本管理器在创建延迟操作时,会把回调方法传给延迟操作对象。当延迟操作完成时,在onComplete()方法中会调用回调方法,返回响应结果给客户端,具体步骤如下。
(2)服务端由于不能立即返回响应结果给客户端,它创建一个延迟的操作对象。
(3)当外部事件尝试完成延迟操作,或者超时后完成延迟的操作。
(4)服务端在延迟操作中调用回调方法,返回响应结果给客户端。

如表6-9所示,创建延迟操作对象时需要提供请求对应的元数据(第三列)。延迟生产元数据的内容是分区的生产结果(PartitionResponse,第二行第四列)。延迟:fV.取元数据的内容是分区的拉取信息(PartitionFetchinfo,第三行第二列)。

“创建延迟的生产对象”之前,将消息集写入分区的主副本巾,每个分区的生产结果会作为“延迟生产的元数据”。“创建延迟的拉取对象”之前,从分区的主目lj本巾读取消息集,但并不会使用分区的拉取结果作为“延迟拉取的元数据”。这是因为“延迟生产”返回给客户挤的响应结果,可以直接从分区的生产结果中获取,而“延迟拉取”返回给客户端的响应结果不能直接从分区的拉取结果中获取。相关代码如下:


元数据可以包含返回结果的条件是:从“创建延迟操作对象”再到“完成延迟操作对象”,元数据的含义不会改变。对于延迟的生产,服务端写λ消息集到主副本返回的结果(即分区的生产结果)是确定的,但因为ISR集合中的备份副本还没有发送应答给主副本,所以才需要“创建延迟的生产操作对象”。服务端在处理备份副本的拉取请求时,不会改变分区的生产结果。最后在“完成延迟操作对象”时,服务端就可以把“创建延迟操作对象”时传递给它的分区生产结果直接返回给生产者。

“创始延迟的拉取对象”之前,读取了主副本的本地日志,但因为消息数量不够,所以“创建延迟的投l仅对象”。延迟拉取的元数据是分区的拉取信息,并不是分区的拉取结果。在“完成延迟的投取操作对象”时,会再次读取主副本的木地日志,第二次的读取有可能是消息数量已经足够或者超时触发的。前符合返回足够的消息给客户端,后者返回给客户端的消息可能不够。

注意:一旦创建延迟的拉取操作对象,就一定会读取两次主副本的本地日志,因为第一次读取的消息数量不够,所以并不需要把第一次读取产生的拉取结采,作为元数据传给延迟的拉取对象。第一次读取和第二次读取使用的拉取信息都是一样的,但是这两次读取本地日志的拉取结果则不一样。

外部非件发生时,服务端会尝试完成延迟的操作对象。延迟生产的外部事件是“备份副本发送了拉取请求”。延迟拉取分两种场景,如果是备份副本的延迟拉取,它的外部事件是“消息集追加到主副本”;如果是消费者的延迟N:取,它的外部事件是“增加主副本的最高水位”。下面分析这两种延迟操作的完成方法。

  1. 尝试完成延迟的生产
    服务端处理生产者客户端的生产请求,将消息集追加到对应主副本的本地日志后,会等待ISR中所有的备份刚本都向主副本发送应答。生产请求包括多个分区的消息集,每个分区都有对应的ISR集合。句所有分区的JSR副本者IS向对应分区的主副本发送了应答,生产请求的处理才算完成。

虽然也产请求有多个分区,但是延迟的生产对象只会创建一个。如果“追加消息集到分区的主副本”没有发生错误,初始化延迟的生产对象时,每个分区的acksPending值等于true,表示分区正在等待应答,即应答正在进行中。当这个分区的ISR备份副本都同步了写入主副本的消息集,才会更改acksPending为false。并且,只有当所有分区的acksPending都等于false,才说明生产请求中所有分区的消息集都同步完成,可以完成延迟的生产请求,即返回需要结果给生产者。相关代码如下:


注意:分布式系统将“应答”作为数据同步是否完成的判断条件。Kafka.生产请求:的一批消息分成多个分区,只有每个分区都成功应答了,才表示这一批消息都同步完成。需妥说明的是,在写入消息集的过程中,只要有一个分区出现错误,就应该立即返回响应结果给生产者,即延迟操作可以立即完成。

判断分区的ISR副本是否都已经向主副本发送了应答,需要检查JSR中所有备份副本的偏移盘(LEO)是否达到了元数据的指定偏移量(requiredOffset)。因为分区的消息集追加到本地日志返回的下一个偏移量就是requiredOffset,所以ISRE斤有副本的偏移盘只要等于requiredOffset,就表示备份副本向主副本发送了应答。

当备份副本向主副本发送拉取请求,服务端读取日志后,会更新对应备份副本的偏移量数据。如图6-54所示,分区凹的主副本是Rl,ISR等于[R1,R2,R3]。下面举例了分区Pl的备份副本发送应答给主副本的过程,具体步骤如下。

(2)备份副本R2第一次拉取时,只读了2条消息,它的偏移量小于6,还不能发送应答给主副本。
(3)备份副本R3第一次拉取时,就读了5条消息,它的偏移量等于6,可以发送应答给主副本。
(4)备份副本R2第二二次拉取时,读了3条消息,它的偏移盘等于6,可以发送应答给主副本。
(5)备份副本R2和备份副本R3都拉取到主副本Rl偏移量等于6的位置。分区的acksPending等于false,表示分区的应答已经结束,即分区中ISR的所有副本都同步了5条消息。

在具体的实现上,备份副本并不需要真正发送一种类型为应答的请求给主副本。因为分区对象已经记录了所有副本的信息,所以在尝试完成延迟的生产时,根据副本的偏移量就可以判断备份副本是否发送了应答。实际上,检查分区是否有足够的副本赶上指定偏移量,只需要判断主副本的最高水位是否等于指定偏移盘。以上面的例子为例,在步骤。)过后,主副本的最高水位等于6,因为最高水位等于requ"iredOffset,就表示分区的ISR所有备份副本都向主副本发送了应答。相关代码如下:

总结下服务端创建了延迟的操作对象,在尝试完成时根据主副本的最高水位判断,具体步骤如下。

(2)服务端返回追加消息集的下一个偏移量,并且创建一个延迟的操作对象。
(3)服务端处理备份副本的拉取请求,首先会读取主副本的本地日志。
(4)服务端返回读取消息集的偏移盐,并更新备份副本的偏移量。
(5)更新主副本的最高水位,选择ISR中所有备份副本中最小的偏移盘。
(6)如果主副本的最高水位超过指定的偏移量,则完成延迟的生产操作。

服务端尝试完成延迟生产的外部事件是:备份副本发送拉取请求,同步主副本的消息。类似地,服务端尝试完成延迟拉取的外部事件是:服务端处理生产请求,追加消息集到主剧本的本地日志。

  1. 尝试完成延迟的拉取

服务端处理客户端(消费者或备份副本)的拉取请求,如果创建了“延迟的拉取对象”,一般是“客户端的消费进度能够一直赶上主副本”。以备份副本同步主副本的数据为例,备份剧本如果一直赶上主副本,当主副本有新消息写入时,备份副本就要及时地同步数据。主副本写了一条消息,备份副
本就要同步一条消息;主副本写了一批消息,备份副本也会一次’性同步一批消息。但针对备份副本已经消费到主副本的最新位置,而主副本并没有新消息写入时,服务端处理备份副本的拉取请求,有下面两种方式。

(2)服务端没有立即返回空的拉取结果给备份副本,而是创建一个“延迟的投取对象”。如果有新消息写入主副本,服务端会等到收集够“拉取请求设置的最少字节数”时,才返回拉取结果给备份副本。如果有新消息写入,但写人的消息数量还不满足最少的字节数。在延迟的拉取对象超时后,服务端也会读取出新写入主副本的消息,返回拉取结果给备份副本。,

注意:如果客户端没有赶上主副本,读取主副本的本地日志时,一般会大于拉取请求设直的最少字节数,服务端就不会创建“延迟的拉取对象”,而是会立即返回拉取结果给客户端。

消费者或备份副本向主副本发起拉取请求时,会指定拉取数据的起始位置(fetchOffset),表示从日志文件的哪个位置开始读取消息。服务端处理拉取请求,会先读取一次日志文件,如果读取出来的消息数量不足fetchMinBytes,就会创建一个“延迟的拉取操作对象”(DelayedFetch)。当完成“延迟的拉取”时,服务端还会再一次读取主副本的本地日志,返回新读取出来的消息集。相关代码如下:

客户端的拉取请求包含多个分区,服务端判断拉取的消息大小时,会收集拉取请求涉及的所有分区。只要消息的总大小超过拉取请求设置的最少字节数,就会调用forceComplete()方法完成延迟的拉取。如图6-55所示,假设客户端扣,取请求设置的最少字节数等于5字节,2个分区[Pl,P2]的主副本都在第一个消息代理节点上。主题设置了2个副本,每个分区都有l个主副本和l个备份副本。分区Pl的备份副本在第二个消息代理节点上,分区凹的备份副本在第三个消息代理节点上。这2个备份副本发送拉取请求给主副本,服务端的具体处理步骤如下。

(2)生产者往Pl和凹的主副本追加第一批消息,Pl写4条消息,P2写3条消息。2个分区的主副本偏移量都增加,服务端尝试完成延迟缓存中的2个延迟拉取操作。由于主副本的偏移量和拉取偏移量差距不足5字节,所以服务端还不能完成延迟的拉取操作。
(3)生产者继续往分区Pl和分区P2追加第二批消息,Pl写2条消息,P2写3条消息。
(4)分区的主副本偏移量增加了,服务端再次尝试完成延迟的拉取操作。由于主副本的偏移量和拉取偏移量的差距超过5字节,所以服务端可以完成延迟的拉取操作。
(5)服务端再次读取主副本的本地日志,并将拉取结果返回给备份副本。备份副本接收到消息后,会写入自己的本地日志,并更新日志的偏移量。备份副本下次发送拉取请求时,使用新的偏移量作为拉取偏移量。

服务端在尝试完成延迟的生产和延迟的拉取时都是根据主副本的相关偏移量信息,判断是否可以完成延迟的操作对象。如表6-10所示,外部事件尝试完成延迟的生产对象时,根据主副本的最高水位判断是否超过指定的偏移量(requlredOffset)。类似地,外部事件尝试完成延迟的拉取对象时,根据主副本的偏移量(或最高水位),判断它与拉取偏移量(fetchOffset)的差距是否超过fetchMlnBytes。

注意:对于备份副本的延迟拉取,主副本的结束偏移量是它的最新偏移量(LEO );对于消费者的延迟拉耳叉,主副本的结束偏移量是它的最高水位(HW)。 这是因为备份副本要时刻保持与主副本的数据同步,而消费者拉取的消息最多只到主副本的最高水位。

本节主要从业务层面上分析了服务端处理延迟生产和延迟拉取的几种操作,主要内容如下。

(2)延迟操作的外部事件发生时,服务端调用tryComplete()方法尝试完成对应的延迟操作。
(3)当可以完成延迟的操作或者超时,服务端调用onComplete()方法执行完成延迟操作的回调方法,即返回响应结果给客户端。

下一节的内容和业务层面无关,我们主要分析延迟缓存、延迟操作底层数据结构的设计。

    原文作者:water___Wang
    原文地址: https://blog.csdn.net/WANTAWAY314/article/details/116021891
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。