当前位置:  开发笔记 > 编程语言 > 正文

Mapfile作为MapReduce作业的输入

如何解决《Mapfile作为MapReduce作业的输入》经验,为你挑选了1个好方法。

我最近开始使用Hadoop,我在使用Mapfile作为MapReduce作业的输入时遇到了问题.

下面的工作代码在hdfs中编写了一个名为"TestMap"的简单MapFile,其中有三个Text类型的键和三个类型为BytesWritable的值.

这里是TestMap的内容:

$ hadoop fs  -text /user/hadoop/TestMap/data
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor
A    01
B    02
C    03

以下是创建TestMap Mapfile的程序:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;

public class CreateMap {

    public static void main(String[] args) throws IOException{

        Configuration conf = new Configuration();
        FileSystem hdfs  = FileSystem.get(conf);

        Text key = new Text();
        BytesWritable value = new BytesWritable();
        byte[] data = {1, 2, 3};
        String[] strs = {"A", "B", "C"};
        int bytesRead;
        MapFile.Writer writer = null;

        writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass());
        try {
            for (int i = 0; i < 3; i++) {
                key.set(strs[i]);
                value.set(data, i, 1);
                writer.append(key, value);
                System.out.println(strs[i] + ":" + data[i] + " added.");
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
             IOUtils.closeStream(writer);
        }
    }
}

后面的简单MapReduce作业尝试将mapfile的值递增1:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;


public class AddOne extends Configured implements Tool {

    public static class MapClass extends MapReduceBase

        implements Mapper {

        public void map(Text key, BytesWritable value,
                        OutputCollector output,
                        Reporter reporter) throws IOException {


            byte[] data = value.getBytes();
            data[0] += 1;
            value.set(data, 0, 1);
            output.collect(key, new Text(value.toString()));
        }
    }

    public static class Reduce extends MapReduceBase
        implements Reducer {

        public void reduce(Text key, Iterator values,
                           OutputCollector output,
                           Reporter reporter) throws IOException {

            output.collect(key, values.next());
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        JobConf job = new JobConf(conf, AddOne.class);

        Path in = new Path("TestMap");
        Path out = new Path("output");
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("AddOne");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ":");


        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AddOne(), args);

        System.exit(res);
    }
}

我得到的运行时异常是:

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
    at AddOne$MapClass.map(AddOne.java:32)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

我不明白为什么hadoop试图强制转换LongWritable,因为在我的代码中我正确地定义了Mapper接口(Mapper).

有人能帮帮我吗?

非常感谢你

卢卡



1> Bkkbrad..:

你的问题来自于一个事实是,尽管什么名字告诉你,一个映射文件是不是一个文件.

MapFile实际上是一个由两个文件组成的目录:有一个"数据"文件,它是一个包含你写入的键和值的SequenceFile ; 但是,还有一个"索引"文件,它是一个不同的SequenceFile,包含键的子序列及其作为LongWritables的偏移量; MapFile.Reader将此索引加载到内存中,以便您快速进行二进制搜索,以便在数据文件中查找具有随机访问时所需数据的偏移量.

您正在使用SequenceFileInputFormat的旧"org.apache.hadoop.mapred"版本.当你告诉它将MapFile看作输入时,知道只看数据文件是不够聪明的.相反,它实际上尝试将数据文件索引文件用作常规输入文件.数据文件将正常工作,因为类与您指定的内容一致,但索引文件将抛出ClassCastException,因为索引文件值都是LongWritables.

您有两个选择:您可以开始使用SequenceFileInputFormat的"org.apache.hadoop.mapreduce"版本(从而更改代码的其他部分),它对MapFiles有足够的了解,只需查看数据文件; 或者,您可以明确地将数据文件作为您想要输入的文件.

推荐阅读
mobiledu2402851323
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有