详解HDFS多文件Join操作的实例
最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是个简单的例子;采用两个表来做left join其中数据结构如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键
代码如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.eshore.traffic.hadoop.util.CommUtil; import cn.eshore.traffic.hadoop.util.StringUtil; /** * @ClassName: DataJoin * @Description: HDFS JOIN操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */ public class InstallJoin extends Configured implements Tool { private String static enSplitCode = "\\|"; private String static splitCode = "|"; // 自定义Reducer public static class ReduceClass extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { String joinedStr = ""; //该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】 //去掉则为All Join if (tags.length == 1 && tags[0].toString().contains("install")) { return null; } Mapmap = new HashMap (); for (int i = 0; i < values.length; i++) { TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(enSplitCode, 8); String groupValue = tokens[6]; String type = tokens[7]; map.put(type, groupValue); } joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30")); TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } // 自定义Mapper public static class MapClass extends DataJoinMapperBase { //自定义Key【类似数据库中的主键/外键】 @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(CommUtil.enSplitCode); String key = ""; String type = tokens[7]; //由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据 if ("7".equals(type)) { key = tokens[0]+"|"+tokens[1]; }else if ("30".equals(type)) { key = tokens[0]+"|"+tokens[2]; } return new Text(key); } @Override protected Text generateInputTag(String inputFile) { return new Text(inputFile); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; // 自定义 public TaggedWritable() { this.tag = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } /** * job运行 */ @Override public int run(String[] paths) throws Exception { int no = 0; try { Configuration conf = getConf(); JobConf job = new JobConf(conf, InstallJoin.class); FileInputFormat.setInputPaths(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); job.setJobName("join_data_test"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", CommUtil.splitCode); JobClient.runJob(job); no = 1; } catch (Exception e) { throw new Exception(); } return no; } //测试 public static void main(String[] args) { String[] paths = { "hdfs://master...:9000/home/hadoop/traffic/join/newtype", "hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" } int res = 0; try { res = ToolRunner.run(new Configuration(), new InstallJoin(), paths); } catch (Exception e) { e.printStackTrace(); } System.exit(res); } }
如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!