如何在Scalding中存储输出

2019年7月29日 17点热度 0条评论

我正在尝试将管道输出到不同的目录中,以便每个目录的输出将基于某些ID进行存储。
因此,在普通 map 精简代码中,我将使用MultipleOutputs类,并在精简器中执行类似的操作。

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

所以我想一个人可以在烫伤中做到这一点

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

但是我认为您最终会多次重做同一个管道,这会影响整体性能。

还有其他选择吗?

谢谢

解决方案如下:

是的,使用TemplatedTsv当然是更好的方法。

因此,您的上述代码可以编写如下,

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

这会将来自'some_id的所有记录放入out / some_ids文件夹下的单独文件夹中。

但是,您也可以创建整数存储桶。只需更改最后几行,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

这将创建两个数字文件夹,如out / dd /。您还可以检查templatedTsv API
here.

使用templatedTsv可能会出现小问题,即reducer可能会生成许多小文件,这可能会对使用您的结果进行下一个作业不利。因此,最好在写入磁盘之前对模板字段进行排序。我写了一个关于它的博客
here.