如何忽略来自Kafka主题的未提交消息

2020年12月1日 52点热度 0条评论

我想拥有一个能够满足两个要求的Kakfa消费者:

要求时使用所有消息(将其获取)
放弃/忽略所有尚未提交的消息(需要帮助)

为简单起见,我只运行一个主题和一个分区。
这是我设置消费者的方式:

private Consumer<Long, String> createConsumer() {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    final Consumer<Long, String> consumer = new KafkaConsumer<>(props);

    consumer.subscribe(Collections.singletonList(topic));
    return consumer;
}

这是我用来完成要求1的方法:

public void write() {
    final ConsumerRecords<Long, String> consumerRecords = transactionsConsumer.poll(1000);
    consumerRecords.forEach(record -> System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
                record.key(), record.value(), record.partition(), record.offset()));

    transactionsConsumer.commitAsync(); 
}

它工作正常,但是至于要求2号,我并没有真正获得最好的方法。我知道我可以实现一个使用
write()的类似方法,该方法消耗并且不打印任何内容,但是对于忽略消息而言,这似乎工作太多。另外,我认为,如果我有很多消息要使用,那么这样做可能会很昂贵。我看了
seekToEnd(partition)之类的方法,但是无法使其正常工作。

解决方案如下:

对于已读的已提交消息,必须设置

isolated.level = read_committed

在用户的配置中。

在read_committed模式下,使用者将仅读取已成功提交的那些事务性消息。它将继续像以前一样读取非事务性消息。在read_committed模式下没有客户端缓冲。相反,针对read_committed使用者的分区的结束偏移量将是该分区中属于一个开放事务的第一条消息的偏移量。该偏移量称为“最后稳定偏移量”(LSO)。