我正在尝试使用Amqp alpakka连接器作为源和接收器。 Source<CommittableReadResult, NotUsed> AMQP_SOURCE -> processAndGetResponse -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样: amqp_source(committableReadResult) -> p…

2020年6月4日 0条评论 25点热度 阅读全文

我有一个SchemaRegistry和KafkaBroker,可以使用Avro v1.8.1从中提取数据。对于反序列化,我一直使用Confluent的KafkaAvroDeserializer。现在,我打算重构代码以使用Alpakka提供的Elasticsearch API,但是不幸的是,这破坏了反序列化,因为它导致NullPointerExceptions: Exception in thread "main" org.apache.kafka.common.errors.SerializationExcepti…

2019年4月27日 0条评论 25点热度 阅读全文