MapReduce应用案例--简单的数据去重

1. 设计思路

  去重,重点就是无论某个数据在文件中出现多少次,最后只是输出一次就可以。 根据这一点,我们联想到在reduce阶段数据输入形式是 <key, value list>,只要是key相同的,在shuffle阶段都会聚合在一起,所以只要在map阶段将要去重的数据作为key值就可以达到目的。

2. 具体实现

  

package moverepeat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 
 * @author Amei 去除重复的数据项
 */

public class Remove {
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        protected void map(LongWritable key, Text value, Context output)
                throws java.io.IOException, InterruptedException {
            //将每一行的数据作为map输出的key
            output.write(value, new Text(""));
        };
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        protected void reduce(Text key, Iterable<Text> values, Context output)
                throws java.io.IOException, InterruptedException {
            //经过shuffle阶段后,Reduce的输入数据格式为<key, value list>,此时key没有相同的值
            output.write(key, new Text(""));
        };
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = new Job(configuration, "remove");
        job.setJarByClass(Remove.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(
                "/user/hadoop_admin/removein"));
        FileOutputFormat.setOutputPath(job, new Path(
                "/user/hadoop_admin/removeout"));
        System.exit((job.waitForCompletion(true) ? 0 : 1));
    }
}

   测试用例

  file01

wangkun 12
wangkun 13
wangkun 15
amei 12
amei 13

  file02

  

wangkun 11
wangkun 13
wangkun 16
amei 12
amei 13

 去重结果:

  

amei 12    
amei 13    
wangkun 11    
wangkun 12    
wangkun 13    
wangkun 15    
wangkun 16