在许多应用MapReduce的实际情况中,最终的算法最终会成为几个MapReduce步骤.
即Map1,Reduce1,Map2,Reduce2等.
因此,您可以获得下一个映射的输入所需的最后一个reduce的输出.
管道成功完成后,您(通常)不希望保留中间数据.另外,因为这个中间数据通常是一些数据结构(如'map'或'set'),所以你不想在编写和读取这些键值对时花费太多精力.
在Hadoop中推荐的方法是什么?
是否有(简单)示例显示如何以正确的方式处理此中间数据,包括之后的清理?
我认为雅虎开发者网络上的这个教程将帮助你解决这个问题:Chaining Jobs
你用的是JobClient.runJob()
.第一个作业的数据输出路径成为第二个作业的输入路径.这些需要作为参数传递给您的作业,并使用适当的代码来解析它们并设置作业的参数.
我认为上面的方法可能是现在较旧的mapred API的方式,但它应该仍然有用.新的mapreduce API中会有类似的方法,但我不确定它是什么.
至于在作业完成后删除中间数据,您可以在代码中执行此操作.我以前做过的方式是使用类似的东西:
FileSystem.delete(Path f, boolean recursive);
其中路径是数据的HDFS上的位置.一旦没有其他工作需要,您需要确保只删除此数据.
有很多方法可以做到.
(1)级联工作
为第一个作业创建JobConf对象"job1",并将所有参数设置为inputdirectory,将"temp"设置为输出目录.执行这项工作:
JobClient.run(job1).
紧接其下方,为第二个作业创建JobConf对象"job2",并将所有参数设置为"temp"作为inputdirectory并将"output"设置为输出目录.执行这项工作:
JobClient.run(job2).
(2)创建两个JobConf对象并将其中的所有参数设置为(1),除非您不使用JobClient.run.
然后使用jobconfs作为参数创建两个Job对象:
Job job1=new Job(jobconf1);
Job job2=new Job(jobconf2);
使用jobControl对象,指定作业依赖关系,然后运行作业:
JobControl jbcntrl=new JobControl("jbcntrl");
jbcntrl.addJob(job1);
jbcntrl.addJob(job2);
job2.addDependingJob(job1);
jbcntrl.run();
(3)如果你需要一个像Map + |那样的结构 减少| Map*,您可以使用随Hadoop版本0.19及更高版本附带的ChainMapper和ChainReducer类.
干杯
实际上有很多方法可以做到这一点.我会专注于两个.
一个是通过Riffle(http://github.com/cwensel/riffle)一个注释库,用于识别依赖事物并以依赖(拓扑)顺序"执行"它们.
或者您可以在Cascading(http://www.cascading.org/)中使用Cascade(和MapReduceFlow ).未来版本将支持Riffle注释,但现在使用原始MR JobConf作业可以很好地工作.
这方面的一个变体是根本不用手工管理MR作业,而是使用Cascading API开发应用程序.然后通过级联规划器和Flow类在内部处理JobConf和作业链.
这样你就可以花时间专注于你的问题,而不是管理Hadoop工作等的机制.你甚至可以在顶层(如clojure或jruby)分层不同的语言,甚至可以进一步简化你的开发和应用程序.http://www.cascading.org/modules.html
我已经使用JobConf对象一个接一个地进行了作业链接.我把WordCount示例用于链接作业.一项工作计算出一个单词在给定输出中重复多少次.第二个作业将第一个作业输出作为输入,并计算出给定输入中的总单词.下面是需要放在Driver类中的代码.
//First Job - Counts, how many times a word encountered in a given file
JobConf job1 = new JobConf(WordCount.class);
job1.setJobName("WordCount");
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job1.setMapperClass(WordCountMapper.class);
job1.setCombinerClass(WordCountReducer.class);
job1.setReducerClass(WordCountReducer.class);
job1.setInputFormat(TextInputFormat.class);
job1.setOutputFormat(TextOutputFormat.class);
//Ensure that a folder with the "input_data" exists on HDFS and contains the input files
FileInputFormat.setInputPaths(job1, new Path("input_data"));
//"first_job_output" contains data that how many times a word occurred in the given file
//This will be the input to the second job. For second job, input data name should be
//"first_job_output".
FileOutputFormat.setOutputPath(job1, new Path("first_job_output"));
JobClient.runJob(job1);
//Second Job - Counts total number of words in a given file
JobConf job2 = new JobConf(TotalWords.class);
job2.setJobName("TotalWords");
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
job2.setMapperClass(TotalWordsMapper.class);
job2.setCombinerClass(TotalWordsReducer.class);
job2.setReducerClass(TotalWordsReducer.class);
job2.setInputFormat(TextInputFormat.class);
job2.setOutputFormat(TextOutputFormat.class);
//Path name for this job should match first job's output path name
FileInputFormat.setInputPaths(job2, new Path("first_job_output"));
//This will contain the final output. If you want to send this jobs output
//as input to third job, then third jobs input path name should be "second_job_output"
//In this way, jobs can be chained, sending output one to other as input and get the
//final output
FileOutputFormat.setOutputPath(job2, new Path("second_job_output"));
JobClient.runJob(job2);
运行这些作业的命令是:
bin/hadoop jar TotalWords.
我们需要为命令提供最终作业名称.在上面的例子中,它是TotalWords.
您可以按照代码中给出的方式运行MR链.
请注意:仅提供了驱动程序代码
public class WordCountSorting {
// here the word keys shall be sorted
//let us write the wordcount logic first
public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException {
//THE DRIVER CODE FOR MR CHAIN
Configuration conf1=new Configuration();
Job j1=Job.getInstance(conf1);
j1.setJarByClass(WordCountSorting.class);
j1.setMapperClass(MyMapper.class);
j1.setReducerClass(MyReducer.class);
j1.setMapOutputKeyClass(Text.class);
j1.setMapOutputValueClass(IntWritable.class);
j1.setOutputKeyClass(LongWritable.class);
j1.setOutputValueClass(Text.class);
Path outputPath=new Path("FirstMapper");
FileInputFormat.addInputPath(j1,new Path(args[0]));
FileOutputFormat.setOutputPath(j1,outputPath);
outputPath.getFileSystem(conf1).delete(outputPath);
j1.waitForCompletion(true);
Configuration conf2=new Configuration();
Job j2=Job.getInstance(conf2);
j2.setJarByClass(WordCountSorting.class);
j2.setMapperClass(MyMapper2.class);
j2.setNumReduceTasks(0);
j2.setOutputKeyClass(Text.class);
j2.setOutputValueClass(IntWritable.class);
Path outputPath1=new Path(args[1]);
FileInputFormat.addInputPath(j2, outputPath);
FileOutputFormat.setOutputPath(j2, outputPath1);
outputPath1.getFileSystem(conf2).delete(outputPath1, true);
System.exit(j2.waitForCompletion(true)?0:1);
}
}
序列是
(JOB1)MAP-> REDUCE->(JOB2)MAP
这样做是为了对键进行排序,但有更多的方法,例如使用树形图
但是我想把注意力集中在乔布斯被链接的方式上! !
谢谢