十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本篇文章为大家展示了利用HDFS怎么实现多文件Join操作,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
鄞州网站制作公司哪家好,找成都创新互联公司!从网页设计、网站建设、微信开发、APP开发、成都响应式网站建设公司等网站项目制作,到程序开发,运营维护。成都创新互联公司于2013年成立到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选成都创新互联公司。
详解HDFS多文件Join操作的实例
最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,
下面是个简单的例子;采用两个表来做left join其中数据结构如下:
A 文件:
a|1b|2|c
B文件:
a|b|1|2|c
即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键
代码如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase; import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import cn.eshore.traffic.hadoop.util.CommUtil; import cn.eshore.traffic.hadoop.util.StringUtil; /** * @ClassName: DataJoin * @Description: HDFS JOIN操作 * @author hadoop * @date 2012-12-18 下午5:51:32 */ public class InstallJoin extends Configured implements Tool { private String static enSplitCode = "\\|"; private String static splitCode = "|"; // 自定义Reducer public static class ReduceClass extends DataJoinReducerBase { @Override protected TaggedMapOutput combine(Object[] tags, Object[] values) { String joinedStr = ""; //该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】 //去掉则为All Join if (tags.length == 1 && tags[0].toString().contains("install")) { return null; } Mapmap = new HashMap (); for (int i = 0; i < values.length; i++) { TaggedWritable tw = (TaggedWritable) values[i]; String line = ((Text) tw.getData()).toString(); String[] tokens = line.split(enSplitCode, 8); String groupValue = tokens[6]; String type = tokens[7]; map.put(type, groupValue); } joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30")); TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); retv.setTag((Text) tags[0]); return retv; } } // 自定义Mapper public static class MapClass extends DataJoinMapperBase { //自定义Key【类似数据库中的主键/外键】 @Override protected Text generateGroupKey(TaggedMapOutput aRecord) { String line = ((Text) aRecord.getData()).toString(); String[] tokens = line.split(CommUtil.enSplitCode); String key = ""; String type = tokens[7]; //由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据 if ("7".equals(type)) { key = tokens[0]+"|"+tokens[1]; }else if ("30".equals(type)) { key = tokens[0]+"|"+tokens[2]; } return new Text(key); } @Override protected Text generateInputTag(String inputFile) { return new Text(inputFile); } @Override protected TaggedMapOutput generateTaggedMapOutput(Object value) { TaggedWritable retv = new TaggedWritable((Text) value); retv.setTag(this.inputTag); return retv; } } public static class TaggedWritable extends TaggedMapOutput { private Writable data; // 自定义 public TaggedWritable() { this.tag = new Text(""); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } @Override public Writable getData() { return data; } @Override public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } @Override public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { try { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } catch (ClassNotFoundException e) { e.printStackTrace(); } } this.data.readFields(in); } } /** * job运行 */ @Override public int run(String[] paths) throws Exception { int no = 0; try { Configuration conf = getConf(); JobConf job = new JobConf(conf, InstallJoin.class); FileInputFormat.setInputPaths(job, new Path(paths[0])); FileOutputFormat.setOutputPath(job, new Path(paths[1])); job.setJobName("join_data_test"); job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); job.setInputFormat(TextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TaggedWritable.class); job.set("mapred.textoutputformat.separator", CommUtil.splitCode); JobClient.runJob(job); no = 1; } catch (Exception e) { throw new Exception(); } return no; } //测试 public static void main(String[] args) { String[] paths = { "hdfs://master...:9000/home/hadoop/traffic/join/newtype", "hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" } int res = 0; try { res = ToolRunner.run(new Configuration(), new InstallJoin(), paths); } catch (Exception e) { e.printStackTrace(); } System.exit(res); } }
上述内容就是利用HDFS怎么实现多文件Join操作,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。