5.3.2 延迟操作和延迟缓存

2021年4月21日 47点热度 0条评论 来源: water___Wang

5.3.2 延迟操作和延迟缓存

Kafka服务端在处理客户端的一些请求时,如果不能及时返回响应结果给客户端,会在服务端创建一个延迟操作对象(DelayedOperatlon),并放在延迟缓存中(DelayedOperati.onPurgatory)。Kafka的延迟操作有多种:延迟的生产、延迟的响应、延迟的加入、延迟的心跳。关于延迟操作和延迟缓存相关的流程,会在下一章详细分析,这里先给出一些延迟操作相关的结论。

  • 延迟操作需要指定一个超时时间,表示在指定时间内没有完成时会被强制完成。
  • 延迟操作加入到延迟缓存中,会指定一个键。比如,和消费组相关的延迟加入,键是消费组编号。
  • 服务端创建延迟操作后,通常会有“尝试完成延迟操作”的动作(延迟操作如果能够尽早完成是最好的)。尝试完成延迟操作的外部事件会有多种情况,而且因为延迟操作有依赖条件,所以任何可能改变依赖条件的事件,都应该执行“尝试完成延迟操作”。比如,协调者因为依赖了“等待消费者发送加入组请求”这个条件才会创建“延迟的加入组”对象。如果有消费者发送了加入组请求,就应该尝试完成“延迟的加入组”对象。
  • 当外部事件尝试完成延迟操作时,怎么判断延迟操作能不能完成?不同的延迟操作类型因为依赖条件不同,应该自定义可以完成延迟操作的条件判断。

创建延迟操作的最终目的是让操作不再被延迟,延迟操作对象有下面几个跟完成操作相关的方法。

  • tryComplete()尝试完成,如果不能完成,返回false,表示延迟操作还不能完成。
  • onComplete()延迟操作完成时的回调方法,完成有两种:正常主动完成和超时被动完成。
  • onExp1.rat1.on()延迟操作超时的回调方法,如果之前一直调用尝试完成都不能完成,在指定的坦时时间过去后就会强制完成。资l用这个回调方法,一定会再调用onComplete()方法。

延迟缓存保存了延迟操作对象。将延迟操作放入延迟缓存时,要将延迟操作和键进行关联。每个延迟操作都对应唯一的键,这样可以通过键来获取延迟缓存中的延迟操作对象。延迟缓存有下面两个相关的方法。

  • tryCompleteElseWatch(operati.on,key)。尝试完成延迟的操作,如果不能完成就以指定的键监控这个延迟操作。创建完延迟操作对象后,可以立即尝试完成,不一定只能由其他事件尝试完成。
  • checkAndComplete(key)。检查并尝试完成指定键的延迟操作,在上一个方法中,如果延迟操作没有完成,会被加入到延迟缓存中。延迟缓存的两个方法都会调用延迟操作的tryCo叩lete()方法:tryCompleteElseWatch()方法会直接调用,因为它的参数中有延迟的操作对象;checkAndComplete()方法只有键,需要先从缓存中根据键取出延迟操作,再尝试完成。

下面以协调者在处理第一个消费者的“加入组请求”,创建了“延迟的加入组”对象为例。每种类型的延迟操作都有对应的状态数据,因为延迟操作的相关完成方法,都需要根据状态数据判断是否可以完成。

  • “延迟心跳操作”的状态数据有:消费组元数据、消费者元数据。消费者元数据会被协调者用来判断这个消费者是否及时发送了心跳。消费组元数据会用在:当消费者没有及时发送心跳,需要将对应的消费者元数据从消费组元数据中移除时。另外,消费组元数据对象还会用在加锁同步代码块上。
  • “延迟加入操作”的状态数据有:消费组元数据。因为协调者判断延迟加入操作是否能够完成的依据是:消费’组中的所有消费者成员是否都发送或重新发送了“加入组请求”。

延迟操作对象都会有一个超时时间,“延迟加入操作”的超时时间是所有消费者最大的会话超时时间,因为“延迟加入操作”是针对一个消费组级别的。“延迟心跳操作”的超时时间是消费者的会话超时时间,因为协调者针对每个消费者,都会创建一个“延迟心跳操作”。这也是延迟心跳操作的状态数据需要“消费者成员元数据”的一个原因,否则无法标识是哪个消费者的心跳延迟了。相关代码如下:

下面来看“延迟加入操作”和完成相关的方法,主要是tryComplete()尝试完成延迟的加入操作。

    原文作者:water___Wang
    原文地址: https://blog.csdn.net/WANTAWAY314/article/details/115865525
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。