我刚刚开始使用Java SDK Apache Beam。由于需要写文件时不能在元素上折行,因此我正在尝试找到一种方法。 从下面看,我有点类似,但仍然找不到等效的选项。beam.io.WriteToText add new line after each value - can it be removed? 我正在尝试的是这个。 org.apache.beam.sdk.io.TextIO.write() 可能吗? 提前致谢。 解决方案如下: 扩展了CombineFn<InputT, AccumT, Output…

2020年11月29日 0条评论 57点热度 阅读全文

我试图为Apache Beam创建一个模板,以将数据索引到elasticsearch。正在创建模板,但是在调用模板时,管道失败,没有协议(protocol)错误。由于错误与URL对象有关,因此看起来很奇怪。 public class StarterPipeline { private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); public interface IndexToEsOptions extends P…

2020年10月19日 0条评论 53点热度 阅读全文

我正在尝试使用Beam-sdks-java-io-hadoop-file-file-system v2.0.0和Spark作为运行程序从Beam应用程序中的AWS EMR集群中读取S3。我可以在 yarn 日志中看到管道可以检测到S3中存在的文件,但无法读取该文件。请查看下面的日志。 17/06/27 03:29:25 INFO FileBasedSource: Filepattern s3a://xxx/test-folder/* matched 1 files with total size 3410584 1…

2020年7月10日 0条评论 91点热度 阅读全文

首先让我描述一下情况。 步骤1.我必须逐行读取文件。该文件是.json文件,每行具有以下格式: { "schema":{Several keys that are to be deleted}, "payload":{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"} } 步骤2.删除模式对象并结束(为后续步骤添加了更多示例): {"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"US…

2020年6月4日 0条评论 35点热度 阅读全文

似乎org.apache.beam.sdk.io.parquet.ParquetIO.readFiles方法需要传递一个架构。 有没有一种方法可以避免传递模式?Parquet文件中不包含该模式吗?如果我试图读取具有不同架构的多个Parquet文件怎么办? 解决方案如下: 请在网上找到我的回复 有没有一种方法可以避免传递模式?当前没有机制来避免传递实木复合地板文件的架构Parquet文件中不包含该模式吗?是的,这是正确的,标头中的元数据是文件的架构定义。请参考BEAM-8344,这是一个开放功能请求,以支持推理模式如…

2020年5月16日 0条评论 50点热度 阅读全文

如何在Java中编写以下代码?如果我有Java中的记录/字典列表,如何编写波束代码,将其写入tfrecord中,其中tf.train.Examples已序列化。有很多使用python的示例,以下是python的一个示例,如何在Java中编写相同的逻辑? import tensorflow as tf import apache_beam as beam from apache_beam.runners.interactive import interactive_runner from apache_beam.co…

2020年2月26日 0条评论 51点热度 阅读全文

我正在尝试使用Apache Beam 2.16.0构建管道来处理大量XML文件。平均计数为每24小时7000万,在高峰负载时,它可以增加到10亿。文件大小从〜1 kb到200 kb不等(有时可能更大,例如30 mb) 文件经过各种转换,最终目标是BigQuery表以供进一步分析。因此,首先我阅读xml文件,然后反序列化为POJO(在Jackson的帮助下),然后应用所有必需的转换。转换速度非常快,在我的机器上,根据文件大小,我每秒能够获得约40000个转换。 我主要关心的是文件读取速度。我感到所有的阅读工作只能由一…

2020年1月12日 0条评论 43点热度 阅读全文

首先让我描述一下情况。 步骤1.我必须逐行读取文件。该文件是.json文件,每行具有以下格式: { "schema":{Several keys that are to be deleted}, "payload":{"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"USD","key5":"100"} } 步骤2.删除模式对象并结束(为后续步骤添加了更多示例): {"key1":20001,"key2":"aaaa","key3":"bbbb","key4":"US…

2020年1月5日 0条评论 42点热度 阅读全文

问题陈述: 我正在尝试使用直接运行器读取和打印Beam中xml文件的内容 这是代码片段: public class BookStore{ public static void main (string args[]){ BookOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BookOptions .class); Pipeline pipeline = Pipeline.create(options); PCo…

2019年12月3日 0条评论 41点热度 阅读全文

我正在尝试使用Apache Beam 2.16.0构建管道来处理大量XML文件。平均计数为每24小时7000万,在高峰负载时,它可以增加到10亿。文件大小从〜1 kb到200 kb不等(有时可能更大,例如30 mb) 文件经过各种转换,最终目标是BigQuery表以供进一步分析。因此,首先我阅读xml文件,然后反序列化为POJO(在Jackson的帮助下),然后应用所有必需的转换。转换速度非常快,在我的机器上,根据文件大小,我每秒能够获得约40000个转换。 我主要关心的是文件读取速度。我感到所有的阅读工作只能由一…

2019年3月20日 0条评论 103点热度 阅读全文