Hadoop之使用MR编程实现join的两种方法( 三 )


Hadoop之使用MR编程实现join的两种方法

文章插图
3.端实现Join
#####3.1思路
通过将关联的条件作为map输出的key , 将两表满足join条件的数据并携带数据所来源的文件信息 , 发往同一个 task , 在中进行数据的串联
优缺点:这种方式中 , join的操作是在阶段完成 , 端的处理压力太大 , map节点的运算负载则很低 , 资源利用率不高 , 且在阶段极易产生数据倾斜
#####3.2编程实现
map实现类.java
package com.wsk.bigdata.mapreduce.mapper;import com.wsk.bigdata.pojo.Info;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class FileReduceJoinMapper extends Mapper {Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split(",");String pid = "";Info info = null;// 通过文件名判断是哪种数据FileSplit inputSplit = (FileSplit) context.getInputSplit();String name = inputSplit.getPath().getName();if (name.startsWith("product")) {pid=fields[0];info = new Info(pid,fields[1],Float.parseFloat(fields[2]),fields[3],"","","","1");} else {pid=fields[1];info = new Info(pid,"",0,"",fields[0],fields[2],fields[3],"0");}if(info==null){return;}k.set(pid);System.out.println("map 输出"+info.toString());context.write(k, info);}}
实现类r.java
package com.wsk.bigdata.mapreduce.reduce;import com.wsk.bigdata.pojo.Info;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;import java.util.Iterator;import java.util.List;public class FileReduceJoinReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {Info pInfo = new Info();List clickBeans = new ArrayList();Iterator iterator = values.iterator();while (iterator.hasNext()) {Info bean = iterator.next();System.out.println("reduce接收 "+bean);if ("1".equals(bean.getFlag())) { //产品try {BeanUtils.copyProperties(pInfo, bean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}} else {Info clickBean = new Info();try {BeanUtils.copyProperties(clickBean, bean);clickBeans.add(clickBean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}}// 拼接数据获取最终结果for (Info bean : clickBeans) {bean.setpName(pInfo.getpName());bean.setPrice(pInfo.getPrice());bean.setProduceArea(pInfo.getProduceArea());System.out.println("reduce结果输出:"+bean.toString());context.write(bean, NullWritable.get());}}}
程序入口.java
package com.wsk.bigdata.mapreduce.driver;import com.wsk.bigdata.mapreduce.mapper.FileReduceJoinMapper;import com.wsk.bigdata.mapreduce.reduce.FileReduceJoinReducer;import com.wsk.bigdata.pojo.Info;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args.length != 2) {System.err.println("please input 2 params: inpt_data output_mapjoin directory");System.exit(0);}String input = args[0];String output = args[1];System.setProperty("hadoop.home.dir", "D:\\appanzhuang\\cdh\\hadoop-2.6.0-cdh5.7.0");Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Path outputPath = new Path(output);if (fs.exists(outputPath)) {fs.delete(outputPath, true);}Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(FileReduceJoinMapper.class);job.setReducerClass(FileReduceJoinReducer.class);// 指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Info.class);//定义Reducer输出数据的kv类型job.setOutputKeyClass(Info.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));boolean res = job.waitForCompletion(true);if (!res) {System.err.println("error:作业执行失败");}}}