这是我的源代码
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class PageRank { public static final String MAGIC_STRING = ">>>>"; boolean overwrite = true; PageRank(boolean overwrite){ this.overwrite = overwrite; } public static class TextPair implements WritableComparable{ Text x; int ordering; public TextPair(){ x = new Text(); ordering = 1; } public void setText(Text t, int o){ x = t; ordering = o; } public void setText(String t, int o){ x.set(t); ordering = o; } public void readFields(DataInput in) throws IOException { x.readFields(in); ordering = in.readInt(); } public void write(DataOutput out) throws IOException { x.write(out); out.writeInt(ordering); } public int hashCode() { return x.hashCode(); } public int compareTo(TextPair o) { int x = this.x.compareTo(o.x); if(x==0) return ordering-o.ordering; else return x; } } public static class MapperA extends Mapper { private Text word = new Text(); Text title = new Text(); Text link = new Text(); TextPair textpair = new TextPair(); boolean start=false; String currentTitle=""; private Pattern linkPattern = Pattern.compile("\\[\\[\\s*(.+?)\\s*\\]\\]"); private Pattern titlePattern = Pattern.compile(" \\s*(.+?)\\s* "); private Pattern pagePattern = Pattern.compile("<page>\\s*(.+?)\\s*</page>"); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); int startPage=line.lastIndexOf(""); if(startPage<0) { Matcher matcher = linkPattern.matcher(line); int n = 0; title.set(currentTitle); while(matcher.find()){ textpair.setText(matcher.group(1), 1); context.write(textpair, title); } link.set(MAGIC_STRING); textpair.setText(title.toString(), 0); context.write(textpair, link); } else { String result=line.trim(); Matcher titleMatcher = titlePattern.matcher(result); if(titleMatcher.find()){ currentTitle = titleMatcher.group(1); } else { currentTitle=result; } } } } public static class ReducerA extends Reducer { Text aw = new Text(); boolean valid = false; String last = ""; public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { TextPair key = context.getCurrentKey(); Text value = context.getCurrentValue(); if(key.ordering==0){ last = key.x.toString(); } else if(key.x.toString().equals(last)){ context.write(key.x, value); } } cleanup(context); } } public static class MapperB extends Mapper { Text t = new Text(); public void map(Text key, Text value, Context context) throws InterruptedException, IOException{ context.write(value, key); } } public static class ReducerB extends Reducer { ArrayList q = new ArrayList (); public void reduce(Text key, Iterable values, Context context)throws InterruptedException, IOException{ q.clear(); for(Text value:values){ q.add(value.toString()); } PageRankRecord prr = new PageRankRecord(); prr.setPageRank(1.0); if(q.size()>0){ String[] a = new String[q.size()]; q.toArray(a); prr.setlinks(a); } context.write(key, prr); } } public boolean roundA(Configuration conf, String inputPath, String outputPath, boolean overwrite) throws IOException, InterruptedException, ClassNotFoundException{ if(FileSystem.get(conf).exists(new Path(outputPath))){ if(overwrite){ FileSystem.get(conf).delete(new Path(outputPath), true); System.err.println("The target file is dirty, overwriting!"); } else return true; } Job job = new Job(conf, "closure graph build round A"); //job.setJarByClass(GraphBuilder.class); job.setMapperClass(MapperA.class); //job.setCombinerClass(RankCombiner.class); job.setReducerClass(ReducerA.class); job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setNumReduceTasks(30); FileInputFormat.addInputPath(job, new Path(inputPath)); SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath)); return job.waitForCompletion(true); } public boolean roundB(Configuration conf, String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException{ if(FileSystem.get(conf).exists(new Path(outputPath))){ if(overwrite){ FileSystem.get(conf).delete(new Path(outputPath), true); System.err.println("The target file is dirty, overwriting!"); } else return true; } Job job = new Job(conf, "closure graph build round B"); //job.setJarByClass(PageRank.class); job.setMapperClass(MapperB.class); //job.setCombinerClass(RankCombiner.class); job.setReducerClass(ReducerB.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(PageRankRecord.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setNumReduceTasks(30); SequenceFileInputFormat.addInputPath(job, new Path(inputPath)); SequenceFileOutputFormat.setOutputPath(job, new Path(outputPath)); return job.waitForCompletion(true); } public boolean build(Configuration conf, String inputPath, String outputPath) throws IOException, InterruptedException, ClassNotFoundException{ System.err.println(inputPath); if(roundA(conf, inputPath, "cgb", true)){ return roundB(conf, "cgb", outputPath); } else return false; } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{ Configuration conf = new Configuration(); //PageRanking.banner("ClosureGraphBuilder"); PageRank cgb = new PageRank(true); cgb.build(conf, args[0], args[1]); } }
这是我编译和运行的方式
javac -classpath hadoop-0.20.1-core.jar -d pagerank_classes PageRank.java PageRankRecord.java jar -cvf pagerank.jar -C pagerank_classes/ . bin/hadoop jar pagerank.jar PageRank pagerank result
但我收到以下错误:
INFO mapred.JobClient: Task Id : attempt_201001012025_0009_m_000001_0, Status : FAILED java.lang.RuntimeException: java.lang.ClassNotFoundException: PageRank$MapperA
谁能告诉我什么是错的
谢谢
如果您使用的是0.2.0 hadoop(想要使用未弃用的类),您可以:
public int run(String[] args) throws Exception { Job job = new Job(); job.setJarByClass(YourMapReduceClass.class); // <-- omitting this causes above error job.setMapperClass(MyMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; }