我正在尝试为我的管道构建单元测试。 该管道从pubsub读取,执行转换并将结果再次写入pubsub。 为了进一步简化单元测试,直到它起作用为止,该单元测试将只接收一个字符串作为输入,并测试输出是否是某个字符串。 该代码如下所示: @RunWith(JUnit4.class) public class TesterPipeline { // Our static output data, which is the expected data that the final PCollection must match.…

2020年12月1日 0条评论 62点热度 阅读全文

我刚刚开始使用Java SDK Apache Beam。由于需要写文件时不能在元素上折行,因此我正在尝试找到一种方法。 从下面看,我有点类似,但仍然找不到等效的选项。beam.io.WriteToText add new line after each value - can it be removed? 我正在尝试的是这个。 org.apache.beam.sdk.io.TextIO.write() 可能吗? 提前致谢。 解决方案如下: 扩展了CombineFn<InputT, AccumT, Output…

2020年11月29日 0条评论 54点热度 阅读全文

我正在尝试使用RedisIO向Redis服务器查询集合。 Redis服务器正常并且仅在批处理管道(无流)时响应良好。 但是,使用流输入数据(来自文件)是这样的: PCollection<String> stream = pipeline.apply("ReadMyFile", TextIO.read().from("/home/out/**") .watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never…

2020年11月9日 0条评论 51点热度 阅读全文

我是我的管道(Java),已使用订阅或主题设置了从pubsub读取的选项: PCollection<PubsubMessage> messages = null; if (options.getUseSubscription()) { messages = pipeline.apply("ReadPubSubSubscription", PubsubIO.readMessagesWithAttributes() .fromSubscription(options.getInputSubscription…

2020年10月25日 0条评论 54点热度 阅读全文

我试图为Apache Beam创建一个模板,以将数据索引到elasticsearch。正在创建模板,但是在调用模板时,管道失败,没有协议(protocol)错误。由于错误与URL对象有关,因此看起来很奇怪。 public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); public interface IndexToEsOptions extends P…

2020年10月19日 0条评论 51点热度 阅读全文

我们目前正在尝试使OpenCV在Google Cloud Dataflow的Java作业中运行。不幸的是,我们无法将Dataflow使用的Docker容器替换为已安装OpenCV的容器。 (See other question)如果我们使用Python SDK,则可以使用一个选项来指定setup.py文件,该文件可用于调用apt-get。使用Java SDK创建的作业是否有类似的东西? 谢谢您的帮助! 解决方案如下: 我想出了一个解决方案,但是可能会有更优雅的方法来做到这一点。 @Setup public void…

2020年10月12日 0条评论 53点热度 阅读全文

我试图为同一密钥输出SUM和COUNT。例如。给定具有数百万个飞机延误事件的.csv。我想使用Apache Beam(Java)求和每个平面的延迟时间,并计算每个平面有多少延迟。 每行都有plane_id, delay_duration, date,依此类推。 我正在尝试创建两个PCollections,并希望在输出之前进行合并。 PCollection<KV<String, Integer>> sum = eventInfo.apply(MapElements.into(TypeDescr…

2020年9月26日 0条评论 34点热度 阅读全文

我现在遇到的核心问题是,当我运行部署到Google Cloud Dataflow的Dataflow管道时,出现错误: java.lang.IllegalStateException:名称为[DEFAULT]的FirebaseApp不存在。 如果我在本地运行相同的管道,则一切正常。因此,我怀疑是身份验证问题还是环境问题。 代码位: DEPLOY和REAL变量用于控制是否推送到Cloud(或在本地运行)以及是否使用我的发布/订阅源或使用Moc'd数据。在Moc'd数据和pub / sub数据之间进行切换似乎根本不影响F…

2020年9月20日 0条评论 42点热度 阅读全文

这两个注释之间有什么区别? DoFn.Setup 用于准备要处理元素束的实例的方法的注释。 使用单词“bundle”,参数为零。 DoFn.StartBundle 用于准备要处理一批元素的实例的方法的注释。 使用单词“batch”,接受零个或一个参数( StartBundleContext ,一种访问PipelineOptions的方法)。 我想做什么 我需要在DoFn实例中初始化一个库,然后对“批处理”或“捆绑销售”中的每个元素使用该库。我通常不会用这两个词来梳头,但是在管道中,可能会有一些区别吗? 解决方案如下…

2020年9月14日 0条评论 68点热度 阅读全文

我目前正在使用Google Cloud Dataflow和Apache Beam来消耗来自Kafka主题的消息,该主题存在于两个不同的Kafka集群中,两个集群均包含相同的主题名称,但主题中的数据不同。Kafka群集是分开的,因为它们包含来自不同区域的数据。 我只是想知道是否可以通过在单个KafkaIO.read Dataflow管道步骤中列出两个集群的所有引导服务器来消耗两个集群的数据? .withBootstrapServers("CLUSTER1_SERVER:PORT,CLUSTER2_SERVER:POR…

2020年9月12日 0条评论 50点热度 阅读全文