在分布式处理环境中,通常使用"部分"文件名,例如"part-000",是否可以编写某种扩展名来重命名各个输出文件名(例如每个窗口文件名) Apache Beam?
为此,可能必须能够为窗口指定名称或根据窗口的内容推断文件名.我想知道这种方法是否可行.
至于解决方案应该是流式还是批量式,流式模式示例是优选的
是的,正如jkff建议的那样,您可以使用TextIO.write.to(FilenamePolicy)来实现这一点.
示例如下:
如果要将输出写入特定的本地文件,可以使用:
lines.apply(TextIO.write()到( "/路径/到/ file.txt的"));
下面是使用前缀link来编写输出的简单方法.此示例适用于Google存储,而不是使用本地/ s3路径.
public class MinimalWordCountJava8 { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); // In order to run your pipeline, you need to make following runner specific changes: // // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner // or FlinkRunner. // CHANGE 2/3: Specify runner-required options. // For BlockingDataflowRunner, set project and temp location as follows: // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); // dataflowOptions.setRunner(BlockingDataflowRunner.class); // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} // for more details. // options.as(FlinkPipelineOptions.class) // .setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) .apply(FlatMapElements .into(TypeDescriptors.strings()) .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.perElement()) .apply(MapElements .into(TypeDescriptors.strings()) .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); p.run().waitUntilFinish(); } }
此示例代码将为您提供更多控制写入输出:
/** * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data * being written. This always includes the shard number and the total number of shards. For * windowed writes, it also includes the window and pane index (a sequence number assigned to each * trigger firing). */ protected static class PerWindowFiles extends FilenamePolicy { private final ResourceId prefix; public PerWindowFiles(ResourceId prefix) { this.prefix = prefix; } public String filenamePrefixForWindow(IntervalWindow window) { String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); return String.format( "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end())); } @Override public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window, PaneInfo paneInfo, OutputFileHints outputFileHints) { IntervalWindow intervalWindow = (IntervalWindow) window; String filename = String.format( "%s-%s-of-%s%s", filenamePrefixForWindow(intervalWindow), shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix()); return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); } @Override public ResourceId unwindowedFilename( int shardNumber, int numShards, OutputFileHints outputFileHints) { throw new UnsupportedOperationException("Unsupported."); } } @Override public PDone expand(PCollectionteamAndScore) { if (windowed) { teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix)); } else { teamAndScore .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(TextIO.write().to(filenamePrefix)); } return PDone.in(teamAndScore.getPipeline()); }