Mapper的默认key是文本中一行的偏移量, value则是输入文本的值
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map.Entry;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.util.*;// vv ConfigurationPrinter
public class DistributionCacheTest extends Configured implements Tool {static {Configuration.addDefaultResource("hdfs-default.xml");Configuration.addDefaultResource("hdfs-site.xml");Configuration.addDefaultResource("yarn-default.xml");Configuration.addDefaultResource("yarn-site.xml");Configuration.addDefaultResource("mapred-default.xml");Configuration.addDefaultResource("mapred-site.xml");}static class TestMapperextends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {File f = new File("1");BufferedReader in = null;try {in = new BufferedReader(new InputStreamReader(new FileInputStream(f)));String line;while ((line = in.readLine()) != null) {context.write(new Text(line+key.toString()),new IntWritable(1));}} finally {IOUtils.closeStream(in);}}}static class TestReducerextends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {context.write(key,new IntWritable(1));}}@Overridepublic int run(String[] args) throws Exception {Configuration conf = getConf();for (Entry<String, String> entry: conf) {System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());}Job job = Job.getInstance(conf, "DistributionCacheTest");if (job == null) {return -1;}job.setJarByClass(DistributionCacheTest.class);job.setMapperClass(TestMapper.class);//job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(TestReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new DistributionCacheTest(), args);System.exit(exitCode);}
}