我有一个简单的问题。可以说我正在读取一个实木复合地板文件,该文件每行生成一个avro GenericRecord对象,如下所示。 {"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j1"} {"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j2"} {"name"…

2020年8月2日 0条评论 17点热度 阅读全文

我有一个Apache Beam作业,该作业会破坏PubSub中的数据,然后加载到BigQuery中,我用字段将PubSub消息转换为pojo ID, 名字,数 计数是指单个摄取中非唯一元素的计数。 如果我从PubSub 3元素(其中两个相同)加载,那么我需要加载到BigQuery 2元素中,其中一个元素的计数为2。 我想知道如何在Apache Beam中轻松实现它。我试图使其通过DoFn或MapElements,但在那里我只能处理单个元素。我也试图将元素转换为KV,然后计数,但是我没有确定性编码器。 在通常的Jav…

2020年7月13日 0条评论 35点热度 阅读全文

我正在尝试使用Beam-sdks-java-io-hadoop-file-file-system v2.0.0和Spark作为运行程序从Beam应用程序中的AWS EMR集群中读取S3。我可以在 yarn 日志中看到管道可以检测到S3中存在的文件,但无法读取该文件。请查看下面的日志。 17/06/27 03:29:25 INFO FileBasedSource: Filepattern s3a://xxx/test-folder/* matched 1 files with total size 3410584 1…

2020年7月10日 0条评论 92点热度 阅读全文

首先让我描述一下情况。 步骤1.我必须逐行读取文件。该文件是.json文件,每行具有以下格式: { "schema":{Several keys that are to be deleted}, "payload":{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"} } 步骤2.删除模式对象并结束(为后续步骤添加了更多示例): {"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"US…

2020年6月4日 0条评论 35点热度 阅读全文

似乎org.apache.beam.sdk.io.parquet.ParquetIO.readFiles方法需要传递一个架构。 有没有一种方法可以避免传递模式?Parquet文件中不包含该模式吗?如果我试图读取具有不同架构的多个Parquet文件怎么办? 解决方案如下: 请在网上找到我的回复 有没有一种方法可以避免传递模式?当前没有机制来避免传递实木复合地板文件的架构Parquet文件中不包含该模式吗?是的,这是正确的,标头中的元数据是文件的架构定义。请参考BEAM-8344,这是一个开放功能请求,以支持推理模式如…

2020年5月16日 0条评论 50点热度 阅读全文

我正在做一个项目,其中我收到大约10个文件,每个文件的大小为200GB。我的项目要求是从每个文件中提取数据,并与其他文件合并并提取数据。 例如,例如,我有1.txt文件,其中有帐户ID,而我有2.txt文件,其中有帐户ID和帐户名。根据第一个文件的帐户ID,我需要从第二个文件中提取数据。 这样,我需要对10个文件中的每个文件执行操作并创建最终输出文件。 我目前正在Java中执行此操作,这确实需要时间。大约需要4到5个小时。 我可以通过任何改变来提高表现吗?是否可以与Java集成并提高性能的任何技术,工具或框架? 我…

2020年4月19日 0条评论 18点热度 阅读全文

我在Java中有一个Apache Beam管道,看起来像这样: pipeline .apply("Read pubsub",PubsubIO.readStrings() .fromTopic(inputTopic) ) .apply(window) .apply(ParDo.of(new OrderCodeKey())) .apply(GroupByKey.<String, RawBsonDocument>create()) .apply(ParDo.of(new GetLastOrder())) .a…

2020年4月4日 0条评论 41点热度 阅读全文

尝试为Apache Beam Cassandra JAR编译和使用快照。似乎该构建未在JAR中打包Guava依赖项。当其他代码使用JAR时,这将导致编译失败-请参见以下异常: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/beam/vendor/guava/v20_0/com/google/common/base/Preconditions at org.apache.beam.sdk.io.cassandra.Cassa…

2020年3月1日 0条评论 57点热度 阅读全文

如何在Java中编写以下代码?如果我有Java中的记录/字典列表,如何编写波束代码,将其写入tfrecord中,其中tf.train.Examples已序列化。有很多使用python的示例,以下是python的一个示例,如何在Java中编写相同的逻辑? import tensorflow as tf import apache_beam as beam from apache_beam.runners.interactive import interactive_runner from apache_beam.co…

2020年2月26日 0条评论 51点热度 阅读全文

在Apache Beam步骤中,我的PCollection为KV<String, Iterable<KV<Long, GenericRecord>>>>。我想将可迭代的所有记录写到同一个Parquet文件中。我的代码段如下 p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord> .apply(ParDo.of(new Map…

2020年2月3日 0条评论 43点热度 阅读全文