二次排序
以MapReduce入门案例词频统计wordcount为基础,将单词及其出现频率,按照频率降序,频率相同,单词字典序升序的顺序进行二次排序
思想
方法1
将要排序的元素,放入数据结构中,如TreeSet,建立比较器,按照要求建立比较规则,将每条数据放入TreeSet,根据比较器规则自动排序,再输出即可
缺点
数据结构的存储有限,根据计算能够算出大约数据结构需要存储的大小。当数据量过大时,容易爆内存。
方法2
自定义数据类型,实现WritableComparable接口,按照排序规则重写compareTo方法,这样将数据存入自定义数据类型当作key,当shuffle阶段,会自动按照compareTo中的方法进行排序
本帖为方法2自定义数据类型实现
举例
am 3 i 5
are 1 am 3
i 5 u 3
iron 1 man 2
like 1 you 2
liking 1 ==》 are 1
love 1 iron 1
loving 1 like 1
man 2 liking 1
spider 1 love 1
u 3 loving 1
you 2 spider 1
代码
MyWritable.java
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class MyWritable implements Writable,WritableComparable<MyWritable> {private String word;private int count;public int compareTo(MyWritable o) {if(o.getCount() != this.count)return o.getCount()-this.count;else return this.word.compareTo(o.getWord());}public void write(DataOutput out) throws IOException {out.writeUTF(this.word);out.writeInt(count);}public void readFields(DataInput in) throws IOException {this.word = in.readUTF();this.count = in.readInt();}@Overridepublic String toString() {return word + '\t' + count;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}public String getWord() {return word;}public void setWord(String word) {this.word = word;}
}
WordCountSort.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountSort {public static class MyMapper extends Mapper<Object, Text, MyWritable, NullWritable> {public MyWritable k = new MyWritable();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String row = value.toString();String[] str = row.split("\t");k.setWord(str[0]);k.setCount(Integer.parseInt(str[1]));context.write(k,NullWritable.get());}}public static class MyReducer extends Reducer<MyWritable, NullWritable, MyWritable, NullWritable>{@Overrideprotected void reduce(MyWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}}public static void main(String[] args) throws IOException {try {//新建作业Job job = Job.getInstance(new Configuration(),"myWordCountSort");//设置主类job.setJarByClass(WordCountSort.class);//设置map端job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(MyWritable.class);job.setMapOutputValueClass(NullWritable.class);//设置reduce端job.setReducerClass(MyReducer.class);job.setOutputKeyClass(MyWritable.class);job.setOutputValueClass(NullWritable.class);//设置job输入输出路径FileInputFormat.addInputPath(job,new Path("/output/mr/wordcount/wc/part-r-00000"));FileOutputFormat.setOutputPath(job,new Path("/output/mr/wordcount/wcsort/"));//提交作业int success = job.waitForCompletion(true) ? 0 : 1;//退出System.exit(success);} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}
}