例如:
DataSet> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet> sum = input.reduce(new ReduceFunction(),Tuple1>{
public Tuple1 reduce(Tuple1 value1,Tuple1 value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
如果上面的reduce转换不是并行操作,我是否需要使用额外的两个转换'partitionByHash'和'mapPartition',如下所示:
DataSet> input = env.fromElements(1,2,3,4,5,6,7,8,9);
DataSet> sum = input.partitionByHash(0).mapPartition(new MapPartitionFunction(),Tuple1>{
public void map(Iterable> values,Collector> out){
long sum = getSum(values);
out.collect(new Tuple1(sum));
}
}).reduce(new ReduceFunction(),Tuple1>{
public Tuple1 reduce(Tuple1 value1,Tuple1 value2){
return new Tuple1<>(value1.f0 + value2.f0);
}
}
以及为什么reduce转换的结果仍然是DataSet的一个实例而不是一个实例 Tuple1
两者,reduce
并且reduceGroup
是集团为单位的运算和记录组应用.如果未指定使用分组键groupBy
,则数据集的所有记录都属于同一组.因此,只有一个组并且最终结果reduce
并reduceGroup
不能并行计算.
如果reduce转换是可组合的(对于任何ReduceFunction
和所有可组合的GroupReduceFunction
s 都是如此),Flink可以并行应用组合器.
对你的两个问题有两个答案:
法比安给出了一个很好的解释.如果按键应用,则操作是并行的.否则只有预聚合是并行的.
在第二个示例中,通过引入密钥使其并行.您可以简单地编写(Java 8 Style)而不是使用"mapPartition()"的复杂解决方法
DataSet> input = ...; input.groupBy(0).reduce( (a, b) -> new Tuple1<>(a.f0 + b.f0);
但请注意,您的输入数据非常小,无论如何只会有一个并行任务.如果使用较大的输入,则可以看到并行预聚合,例如:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10); DataSetinput = env.generateSequence(1, 100000000); DataSet sum = input.reduce ( (a, b) -> a + b );
DataSet仍然是群集中X的惰性表示.您可以继续在并行程序中使用该数据,而不会触发某些计算并将结果数据从分布式工作程序提取回驱动程序.这允许您编写更大的程序,这些程序完全在分布式工作程序上运行并且被懒惰地执行.没有数据被提取到客户端并重新分发给并行工作者.
特别是在迭代程序中,这非常强大,因为整个循环在不涉及客户端且需要重新部署运算符的情况下工作.
你总是可以通过调用"dataSet.collext().get(0);"获得"X". - 它明确表示应该执行和获取某些内容.