我正在尝试使用Amqp alpakka连接器作为源和接收器。 Source<CommittableReadResult, NotUsed> AMQP_SOURCE -> processAndGetResponse -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样: amqp_source(committableReadResult) -> p…
我正在尝试使用Amqp alpakka连接器作为源和接收器。 Source<CommittableReadResult, NotUsed> AMQP_SOURCE -> processAndGetResponse -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样: amqp_source(committableReadResult) -> p…
我有一个SchemaRegistry和KafkaBroker,可以使用Avro v1.8.1从中提取数据。对于反序列化,我一直使用Confluent的KafkaAvroDeserializer。现在,我打算重构代码以使用Alpakka提供的Elasticsearch API,但是不幸的是,这破坏了反序列化,因为它导致NullPointerExceptions: Exception in thread "main" org.apache.kafka.common.errors.SerializationExcepti…