词频统计 MapReduce 初识+案例( 二 )


4.1.1.2方法
// 在任务开始时调用一次,通常用作创建连接、打开流等获取资源的操作protected void setup(Context context) throws IOException, InterruptedException {// NOTHING}
// 对输入拆分中的每个键/值对调用一次,通常需要重写该方法,这是一个默认的核心方法@SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {// map 处理完后将数据写出去context.write((KEYOUT) key, (VALUEOUT) value);}
// 在任务结束时调用一次,用作关闭资源等protected void cleanup(Context context) throws IOException, InterruptedException {// NOTHING}

词频统计  MapReduce 初识+案例

文章插图
// 该方法相当于整合了上面三个方法public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKeyValue()) {map(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {cleanup(context);}}
4.1.2
词频统计如下:
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class Job_WordCountMapper extends Mapper {// 输出Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 将读取的文本每行的数据,分隔成单词String[] words = value.toString().split("\\s+");// 对单词进行处理for (String word : words) {// 转小写 去除特殊字符String w = word.toLowerCase().replace("\\W", "");// 将单词作为输出的 keyk.set(w);// 使用上下文对象 将 mapper 处理的结果以 的方式写到 MapReduce 框架context.write(k, v);}}}
4.24.2.1 窥见源码 4.2.1.1类
类提供了四个泛型,分别是输入参数的键值对类型、输出参数的键值对类型
public class Reducer {...}
负责接收输出的内容,所以 KEYIN、 就对应的输出键值对的类型, 使用 Text, 使用 (防止结果量级太大)
4.2.1.2方法
protected void setup(...){...}protected void cleanup(...){...}public void run(...){...}
// 这个方法对每个键调用一次,通常需要重写该方法,这是一个默认的核心方法protected void reduce(KEYIN key, Iterable values, Context context) throws IOException, InterruptedException {for(VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}
4.2.2
词频统计如下:
import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Job_WordCountReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {// 声明变量 用于存储聚合完的结果long count = 0;// 遍历相同的 key 获取对应的所有 valuefor (IntWritable value : values) {count += value.get();}// 将聚合完的结果写到 MapReduce 框架context.write(key, new LongWritable(count));}}
4.3
我们需要实例化 Job 对象,并配置相关类(相当于整合了 、) 。