我正在尝试使用Amqp alpakka连接器作为源和接收器。 Source<CommittableReadResult, NotUsed> AMQP_SOURCE -> processAndGetResponse -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 在接收器的操作成功之后,我想确认从Amqp队列获得的消息。像这样: amqp_source(committableReadResult) -> p…

2020年6月4日 0条评论 30点热度 阅读全文

在Akka Streams Doku中,给出了一个Scala示例,其中包含一个空值的列表。该列表转换为Source,并像示例中那样减少。不幸的是,在Java / Kotlin中这不会输出任何内容。 通过推文链接到Scala示例:https://doc.akka.io/docs/akka/current/stream/stream-quickstart.html#first-steps 在这里,我翻译成 Kotlin val system = ActorSystem.create("reactive-tweets")…

2020年4月22日 0条评论 36点热度 阅读全文

这是类(class): import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.model.ContentTypes; import akka.http.javadsl.model.HttpEntities; import akka.http.ja…

2020年3月29日 0条评论 36点热度 阅读全文

我创建了一个源,一个流和一个接收器 源是整数源 流将整数传输到字符串 然后接收器将写入文件 但是我不知道如何将所有这些连接在一起以获得可运行的图 public static void main(String[] args) { ActorSystem system = ActorSystem.create(); Materializer materializer = ActorMaterializer.create(system); Source<Integer, NotUsed> source = S…

2020年3月12日 0条评论 38点热度 阅读全文

我有一个Slick源,它产生多个源: Source myBigSource = Slick.source(slickSession, sqlQueryString, (SlickRow row) -> { return (Source)createNewSource(row, someContext); } ); 如何将createNewSource产生的源连接到一个大源中。 解决方案如下: 使用flatMapConcat或flatMapMerge: Source myBigSource = Slick.so…

2020年2月28日 0条评论 21点热度 阅读全文

我已经将Play框架版本从 2.4.1 更新为 2.5.6 ,但是现在我在Web套接字管理方面遇到了问题。 我有一个Controller类,其中liveUpdate()方法返回WebSocket<String>实例。在这种方法中,我在WebSocket.whenReady()中使用Out<String>来使用HashMap<Out<String>, String>,其中的键是客户端输出流,而值是包含语言信息的String,因为当我需要发送广播消息时,我会迭代HashM…

2020年2月27日 0条评论 27点热度 阅读全文

我仅在Docker容器中运行的应用程序中在AkkaStream中使用XmlDecoder时遇到了问题。 错误描述 java.lang.ClassNotFoundException: com/example/xmldecoder/FileDto Continuing ... java.lang.ClassNotFoundException: com/example/xmldecoder/FileDto Continuing ... java.lang.NoSuchMethodException: <unboun…

2019年12月1日 0条评论 24点热度 阅读全文

嗨,我正在使用Kafka和Akka Streams。在主题Kafka的MyTestTopic中,我有3个分区,数据大约以1000 QPS的高并发率被推送到主题中,并且只会高于该值。 下面是我的Akka Stream Kafka Consumer代码: final ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(kafkaConfig, new StringDeserializer(), new Byt…

2019年11月26日 0条评论 34点热度 阅读全文

Scala中的Akka流提供了Flow概念。 Java中的等效功能是什么? 例如,在Scala中,存在Flow.take(n) 但是在Java中,有Source.take(n)会返回源而不是流 解决方案如下: 正如documentation清楚表明的那样,Akka库(包括Akka流)都具有Java API和Scala API,因此Flow # take的Java等效项是... Flow # take。

2019年11月13日 0条评论 32点热度 阅读全文

以下来自文档(akka): 交货保证 流裁判将正常的演员消息传递用于他们的体育比赛,因此提供了相同水平的基本交付保证。流引用确实通过需求重新传递和顺序故障检测在某种程度上扩展了语义。换一种说法: messages are sent over actor remoting which relies on TCP (classic remoting or Artery TCP) or Aeron UDP for basic redelivery mechanisms messages are guaranteed to…

2019年6月25日 0条评论 27点热度 阅读全文