当前位置: 代码迷 >> 综合 >> MapReduce 基础案例 之 二次排序 自定义数据类型 实现
  详细解决方案

MapReduce 基础案例 之 二次排序 自定义数据类型 实现

热度:32   发布时间:2023-12-29 14:49:23.0

二次排序

以MapReduce入门案例词频统计wordcount为基础,将单词及其出现频率,按照频率降序,频率相同,单词字典序升序的顺序进行二次排序

思想

方法1

将要排序的元素,放入数据结构中,如TreeSet,建立比较器,按照要求建立比较规则,将每条数据放入TreeSet,根据比较器规则自动排序,再输出即可

缺点

数据结构的存储有限,根据计算能够算出大约数据结构需要存储的大小。当数据量过大时,容易爆内存。

方法2

自定义数据类型,实现WritableComparable接口,按照排序规则重写compareTo方法,这样将数据存入自定义数据类型当作key,当shuffle阶段,会自动按照compareTo中的方法进行排序

本帖为方法2自定义数据类型实现

举例

am	3		i	5
are	1		am	3
i	5		u	3
iron	1		man	2
like	1		you	2
liking	1     ==》      are     1
love	1		iron	1
loving	1		like	1
man	2		liking	1
spider	1		love	1
u	3		loving	1
you	2		spider	1

代码

MyWritable.java


import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class MyWritable implements Writable,WritableComparable<MyWritable> {private String word;private int count;public int compareTo(MyWritable o) {if(o.getCount() != this.count)return o.getCount()-this.count;else return this.word.compareTo(o.getWord());}public void write(DataOutput out) throws IOException {out.writeUTF(this.word);out.writeInt(count);}public void readFields(DataInput in) throws IOException {this.word = in.readUTF();this.count = in.readInt();}@Overridepublic String toString() {return word + '\t' + count;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}
}

WordCountSort.java


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountSort {public static class MyMapper extends Mapper<Object, Text, MyWritable, NullWritable> {public MyWritable k = new MyWritable();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String row = value.toString();String[] str = row.split("\t");k.setWord(str[0]);k.setCount(Integer.parseInt(str[1]));context.write(k,NullWritable.get());}}public static class MyReducer extends Reducer<MyWritable, NullWritable, MyWritable, NullWritable>{@Overrideprotected void reduce(MyWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}}public static void main(String[] args) throws IOException {try {//新建作业Job job = Job.getInstance(new Configuration(),"myWordCountSort");//设置主类job.setJarByClass(WordCountSort.class);//设置map端job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(MyWritable.class);job.setMapOutputValueClass(NullWritable.class);//设置reduce端job.setReducerClass(MyReducer.class);job.setOutputKeyClass(MyWritable.class);job.setOutputValueClass(NullWritable.class);//设置job输入输出路径FileInputFormat.addInputPath(job,new Path("/output/mr/wordcount/wc/part-r-00000"));FileOutputFormat.setOutputPath(job,new Path("/output/mr/wordcount/wcsort/"));//提交作业int success = job.waitForCompletion(true) ? 0 : 1;//退出System.exit(success);} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}
}

 

  相关解决方案