Hadoop之使用MR编程实现join的两种方法( 二 )


map实现类.java
package com.wsk.bigdata.mapreduce.mapper;import com.wsk.bigdata.pojo.Info;import org.apache.commons.lang3.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.util.HashMap;import java.util.Map;/*** 文件间的Mapjoin*/public class FileMapJoinMapper extends Mapper {/*** 产品信息信息集合 , key=产品ID , value=http://www.kingceram.com/post/产品信息*/private Map, Info> infos = new HashMap<>();/*** 执行Map方法前会调用一次setup方法 , 我们可以用于* 初始化读取产品信息加到到内存中**/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {System.out.println("--------MAP初始化:加载产品信息数据到内存------");BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(System.getProperty("product.info.dir"))));String line;while (StringUtils.isNotEmpty(line = br.readLine())) {String[] fields = line.split(",");if (fields != null && fields.length == 4) {Info info = new Info(fields[0], fields[1], Float.parseFloat(fields[2]), fields[3], "", "", "", "1");infos.put(fields[0], info);}}br.close();System.out.println("--------MAP初始化:共加载了" + infos.size() + "条产品信息数据------");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split(",");if (fields != null && fields.length == 4) {String pid = fields[1];Info produceInfo = infos.get(pid);if (produceInfo == null) {return;}Info info = new Info(produceInfo.getpId(), produceInfo.getpName(), produceInfo.getPrice(), produceInfo.getProduceArea(), fields[0], fields[2], fields[3], null);context.write(info, NullWritable.get());}}}
程序入口类.java
package com.wsk.bigdata.mapreduce.driver;import com.wsk.bigdata.mapreduce.mapper.FileMapJoinMapper;import com.wsk.bigdata.pojo.Info;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if(args.length != 3 ) {System.err.println("please input 3 params: product_File page_click_file output_mapjoin directory");System.exit(0);}String productInfo = args[0];String input = args[1];String output = args[2];System.setProperty("hadoop.home.dir", "D:\\appanzhuang\\cdh\\hadoop-2.6.0-cdh5.7.0");System.setProperty("product.info.dir",productInfo);Configuration conf = new Configuration();// 写代码:死去活来法FileSystem fs = FileSystem.get(conf);Path outputPath = new Path(output);if(!fs.exists(new Path(productInfo))){System.err.println("not found File "+productInfo);System.exit(0);}if(fs.exists(outputPath)) {fs.delete(outputPath, true);}Job job = Job.getInstance(conf);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(FileMapJoinMapper.class);// 指定mapper输出数据的kv类型job.setMapOutputKeyClass(Info.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));// map端join的逻辑不需要reduce阶段 , 设置reducetask数量为0job.setNumReduceTasks(0);boolean res = job.waitForCompletion(true);}}
程序运行参数 , 分别是产品信息文件路径、页面点击日志数据路径、输出结果路径