当前位置:  开发笔记 > 编程语言 > 正文

Apache Beam是否支持其输出的自定义文件名?

如何解决《ApacheBeam是否支持其输出的自定义文件名?》经验,为你挑选了1个好方法。

在分布式处理环境中,通常使用"部分"文件名,例如"part-000",是否可以编写某种扩展名来重命名各个输出文件名(例如每个窗口文件名) Apache Beam?

为此,可能必须能够为窗口指定名称或根据窗口的内容推断文件名.我想知道这种方法是否可行.

至于解决方案应该是流式还是批量式,流式模式示例是优选的



1> abhijeet dhu..:

是的,正如jkff建议的那样,您可以使用TextIO.write.to(FilenamePolicy)来实现这一点.

示例如下:

如果要将输出写入特定的本地文件,可以使用:

lines.apply(TextIO.write()到( "/路径/到/ file.txt的"));

下面是使用前缀link来编写输出的简单方法.此示例适用于Google存储,而不是使用本地/ s3路径.

public class MinimalWordCountJava8 {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    // In order to run your pipeline, you need to make following runner specific changes:
    //
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
    // or FlinkRunner.
    // CHANGE 2/3: Specify runner-required options.
    // For BlockingDataflowRunner, set project and temp location as follows:
    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    //   dataflowOptions.setRunner(BlockingDataflowRunner.class);
    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
    // for more details.
    //   options.as(FlinkPipelineOptions.class)
    //      .setRunner(FlinkRunner.class);

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
     .apply(FlatMapElements
         .into(TypeDescriptors.strings())
         .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
     .apply(Filter.by((String word) -> !word.isEmpty()))
     .apply(Count.perElement())
     .apply(MapElements
         .into(TypeDescriptors.strings())
         .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run().waitUntilFinish();
  }
}

此示例代码将为您提供更多控制写入输出:

 /**
   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
   * being written. This always includes the shard number and the total number of shards. For
   * windowed writes, it also includes the window and pane index (a sequence number assigned to each
   * trigger firing).
   */
  protected static class PerWindowFiles extends FilenamePolicy {

    private final ResourceId prefix;

    public PerWindowFiles(ResourceId prefix) {
      this.prefix = prefix;
    }

    public String filenamePrefixForWindow(IntervalWindow window) {
      String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
      return String.format(
          "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
    }

    @Override
    public ResourceId windowedFilename(int shardNumber,
                                       int numShards,
                                       BoundedWindow window,
                                       PaneInfo paneInfo,
                                       OutputFileHints outputFileHints) {
      IntervalWindow intervalWindow = (IntervalWindow) window;
      String filename =
          String.format(
              "%s-%s-of-%s%s",
              filenamePrefixForWindow(intervalWindow),
              shardNumber,
              numShards,
              outputFileHints.getSuggestedFilenameSuffix());
      return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        int shardNumber, int numShards, OutputFileHints outputFileHints) {
      throw new UnsupportedOperationException("Unsupported.");
    }
  }

  @Override
  public PDone expand(PCollection teamAndScore) {
    if (windowed) {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
    } else {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(TextIO.write().to(filenamePrefix));
    }
    return PDone.in(teamAndScore.getPipeline());
  }

推荐阅读
郑小蒜9299_941611_G
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有