我的程序遵循迭代的map/reduce方法.如果满足某些条件,它需要停止.无论如何,我可以设置一个全局变量,可以分布在所有map/reduce任务中,并检查全局变量是否达到完成条件.
像这样的东西.
While(Condition != true){ Configuration conf = getConf(); Job job = new Job(conf, "Dijkstra Graph Search"); job.setJarByClass(GraphSearch.class); job.setMapperClass(DijkstraMap.class); job.setReducerClass(DijkstraReduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); }
where条件是在每个map/reduce执行期间/之后修改的全局变量.
每次运行map-reduce作业时,都可以检查输出的状态,计数器中包含的值等,并在控制迭代的节点上决定是否要再进行一次迭代.我想我不明白在你的场景中对全局状态的需求来自何处.
更一般地说 - 在执行节点之间共享状态有两种主要方式(尽管应该注意,最好避免共享状态,因为它限制了可伸缩性).
将文件写入其他节点可以读取的HDFS(确保在作业退出时清理文件,并且推测执行不会导致奇怪的故障).
使用ZooKeeper将一些数据存储在专用的ZK树节点中.
您可以使用Configuration.set(String name,String value)设置您可以在Mappers/Reducers/etc中访问的值:
在你的司机:
conf.set("my.dijkstra.parameter", "value");
例如在你的映射器中:
public void configure(JobConf job) { myParam = job.get("my.dijkstra.parameter"); }
但这不太可能帮助您查看以前作业的输出,以决定是否再开始一次迭代.也就是说,在执行作业后,该值不会被推回.
您还可以使用Hadoop的DistributedCache来存储将在所有节点之间分发的文件.如果你要通过这种方式传递的值很小,这比仅仅在HDFS上存储一些东西要好一些.
当然,计数器也可用于此目的.但是,为了在算法中做出决策,它们看起来不太可靠.看起来在某些情况下,它们可以递增两次(如果某个任务执行了一次以上,例如在失败或投机执行的情况下) - 我不确定.