我一直在尝试使用Hadoop将N行数发送到单个映射.我不需要拆分线路.
我曾尝试使用NLineInputFormat,但是它会将数据中的N行文本一次一行地发送到每个映射器[在第N行之后放弃].
我试图设置选项,它只需要N行输入一次一行地发送到每个地图:
job.setInt("mapred.line.input.format.linespermap", 10);
我找到了一个邮件列表,建议我覆盖LineRecordReader :: next,但这不是那么简单,因为内部数据成员都是私有的.
我刚检查了NLineInputFormat的源代码,它硬编码LineReader,所以覆盖无济于事.
另外,顺便说一句,我正在使用Hadoop 0.18与Amazon EC2 MapReduce兼容.
您必须实现自己的输入格式.您也可以定义自己的记录阅读器.
不幸的是,你必须定义一个getSplits()方法.在我看来,这将比实现记录阅读器更难:这种方法必须实现一个逻辑来输入数据.
请参阅以下摘录"Hadoop - 权威指南"(我一直推荐的一本好书!):
这是界面:
public interface InputFormat{ InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; }
JobClient调用getSplits()方法,将所需数量的map任务作为numSplits参数传递.此数字被视为提示,因为InputFormat实现可以自由地将不同数量的拆分返回到numSplits中指定的数字.在计算了拆分后,客户端将它们发送到jobtracker,后者使用其存储位置来安排map任务以在tasktrackers上处理它们.
在任务跟踪器上,映射任务将拆分传递给InputFormat上的getRecordReader()方法,以获取该拆分的RecordReader.RecordReader只是记录上的迭代器,map任务使用一个来生成记录键值对,并将其传递给map函数.代码片段(基于MapRunner中的代码)说明了这个想法:
K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { mapper.map(key, value, output, reporter); }