map/reduce 是很通用的,并非只有在hadoop上才能使用,不要被限制。处理已排序的列表,在内存和计算的消耗上都是很低的。
Mapper
package mapreduce;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;public abstract class Mapper<K0, V0, K, V> {private final Map<K, List<V>> map = new TreeMap<K, List<V>>();public Map<K, List<V>> getResult() {return map;}public abstract void map(K0 key, V0 value, Context context)throws RuntimeException;public class Context {public void write(K k, V v) {List<V> list = map.get(k);if (list == null) {list = new ArrayList<V>();}list.add(v);map.put(k, list);}}}
Reducer
package mapreduce;import java.util.Map;
import java.util.TreeMap;public abstract class Reducer<K, V, K1, V1> {private final TreeMap<K1, V1> map = new TreeMap<K1, V1>();public Map<K1, V1> getResult() {return map;}public abstract void reduce(K k, Iterable<V> list, Context context)throws RuntimeException;public class Context {public void write(K1 k, V1 v) {map.put(k, v);}}}
Job
// 从数组读入,输出到Log,仅供原理演示
// 满屏的小黄叹号,告诉我们说java的泛型太tm难用了
package mapreduce;import java.util.Iterator;
import java.util.List;
import java.util.Map;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class Job {private static final Logger LOG = LoggerFactory.getLogger(Job.class);private Class<? extends Mapper> map;private Class<? extends Reducer> reduce;public void setMap(Class<? extends Mapper> map) {this.map = map;}public void setReduce(Class<? extends Reducer> reduce) {this.reduce = reduce;}public int run(String[] input) throws RuntimeException {int len = input.length;try {Mapper m = map.newInstance();for (int i = 0; i < len; i++) {m.map(0L + i, input[i], m.new Context());}Map<Object, List<Object>> mapResult = m.getResult();Reducer r = reduce.newInstance();Iterator<Object> it = mapResult.keySet().iterator();while (it.hasNext()) {Object k = it.next();r.reduce(k, mapResult.get(k), r.new Context());}Map<Object, Object> reduceResult = r.getResult();it = reduceResult.keySet().iterator();while (it.hasNext()) {Object k = it.next();LOG.info("{}\t{}", k.toString(), reduceResult.get(k));}} catch (InstantiationException e) {LOG.error("", e);throw new RuntimeException(e);} catch (IllegalAccessException e) {LOG.error("", e);throw new RuntimeException(e);} catch (Exception e) {LOG.error("", e);throw new RuntimeException(e);}return 1;}}
实例 wordcount
package mr.maillog;import java.io.IOException;import mapreduce.Job;
import mapreduce.Mapper;
import mapreduce.Reducer;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class WordCount {private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);public static final class M extends Mapper<Long, String, String, Long> {@Overridepublic void map(Long key, String value, Context context)throws RuntimeException {String tmp = value.toLowerCase();String[] line = tmp.split("[\\s]+");for (String word : line) {context.write(word, 1L);}}}public static final class R extends Reducer<String, Long, String, Long> {@Overridepublic void reduce(String k, Iterable<Long> list, Context context)throws RuntimeException {Long count = 0L;for (Long item : list) {count += item;}context.write(k, count);}}/*** @param args* @throws IOException* @throws InterruptedException*/public static void main(String[] args) throws IOException,InterruptedException {LOG.info("Hi");String[] data = {"What Is Apache Hadoop?","The Apache? Hadoop? project develops open-source software for reliable, scalable, distributed computing.","The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using a simple programming model. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-avaiability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-availabile service on top of a cluster of computers, each of which may be prone to failures.","The project includes these subprojects:","Hadoop Common: The common utilities that support the other Hadoop subprojects.","Hadoop Distributed File System (HDFS?): A distributed file system that provides high-throughput access to application data.","Hadoop MapReduce: A software framework for distributed processing of large data sets on compute clusters.","Other Hadoop-related projects at Apache include:","Avro?: A data serialization system.","Cassandra?: A scalable multi-master database with no single points of failure.","Chukwa?: A data collection system for managing large distributed systems.","HBase?: A scalable, distributed database that supports structured data storage for large tables.","Hive?: A data warehouse infrastructure that provides data summarization and ad hoc querying.","Mahout?: A Scalable machine learning and data mining library.","Pig?: A high-level data-flow language and execution framework for parallel computation.","ZooKeeper?: A high-performance coordination service for distributed applications." };Job job = new Job();job.setMap(M.class);job.setReduce(R.class);job.run(data);}}
然后,输出了
12/07/04 16:05:31 INFO maillog.WordCount: Hi 12/07/04 16:05:31 INFO mapreduce.Job: (hdfs?): 1 12/07/04 16:05:31 INFO mapreduce.Job: a 14 12/07/04 16:05:31 INFO mapreduce.Job: access 1 12/07/04 16:05:31 INFO mapreduce.Job: across 1 12/07/04 16:05:31 INFO mapreduce.Job: ad 1 12/07/04 16:05:31 INFO mapreduce.Job: allows 1 12/07/04 16:05:31 INFO mapreduce.Job: and 5 12/07/04 16:05:31 INFO mapreduce.Job: apache 3 12/07/04 16:05:31 INFO mapreduce.Job: apache? 1 12/07/04 16:05:31 INFO mapreduce.Job: application 2 12/07/04 16:05:31 INFO mapreduce.Job: applications. 1 12/07/04 16:05:31 INFO mapreduce.Job: at 2 12/07/04 16:05:31 INFO mapreduce.Job: avro?: 1 12/07/04 16:05:31 INFO mapreduce.Job: be 1 12/07/04 16:05:31 INFO mapreduce.Job: cassandra?: 1 12/07/04 16:05:31 INFO mapreduce.Job: chukwa?: 1 12/07/04 16:05:31 INFO mapreduce.Job: cluster 1 12/07/04 16:05:31 INFO mapreduce.Job: clusters 1 12/07/04 16:05:31 INFO mapreduce.Job: clusters. 1 12/07/04 16:05:31 INFO mapreduce.Job: collection 1 12/07/04 16:05:31 INFO mapreduce.Job: common 1 12/07/04 16:05:31 INFO mapreduce.Job: common: 1 12/07/04 16:05:31 INFO mapreduce.Job: computation 1 12/07/04 16:05:31 INFO mapreduce.Job: computation. 1 12/07/04 16:05:31 INFO mapreduce.Job: compute 1 12/07/04 16:05:31 INFO mapreduce.Job: computers 1 12/07/04 16:05:31 INFO mapreduce.Job: computers, 1 12/07/04 16:05:31 INFO mapreduce.Job: computing. 1 12/07/04 16:05:31 INFO mapreduce.Job: coordination 1 12/07/04 16:05:31 INFO mapreduce.Job: data 8 12/07/04 16:05:31 INFO mapreduce.Job: data-flow 1 12/07/04 16:05:31 INFO mapreduce.Job: data. 1 12/07/04 16:05:31 INFO mapreduce.Job: database 2 12/07/04 16:05:31 INFO mapreduce.Job: deliver 1 12/07/04 16:05:31 INFO mapreduce.Job: delivering 1 12/07/04 16:05:31 INFO mapreduce.Job: designed 2 12/07/04 16:05:31 INFO mapreduce.Job: detect 1 12/07/04 16:05:31 INFO mapreduce.Job: develops 1 12/07/04 16:05:31 INFO mapreduce.Job: distributed 8 12/07/04 16:05:31 INFO mapreduce.Job: each 2 12/07/04 16:05:31 INFO mapreduce.Job: execution 1 12/07/04 16:05:31 INFO mapreduce.Job: failure. 1 12/07/04 16:05:31 INFO mapreduce.Job: failures 1 12/07/04 16:05:31 INFO mapreduce.Job: failures. 1 12/07/04 16:05:31 INFO mapreduce.Job: file 2 12/07/04 16:05:31 INFO mapreduce.Job: for 7 12/07/04 16:05:31 INFO mapreduce.Job: framework 3 12/07/04 16:05:31 INFO mapreduce.Job: from 1 12/07/04 16:05:31 INFO mapreduce.Job: hadoop 5 12/07/04 16:05:31 INFO mapreduce.Job: hadoop-related 1 12/07/04 16:05:31 INFO mapreduce.Job: hadoop? 1 12/07/04 16:05:31 INFO mapreduce.Job: hadoop? 1 12/07/04 16:05:31 INFO mapreduce.Job: handle 1 12/07/04 16:05:31 INFO mapreduce.Job: hardware 1 12/07/04 16:05:31 INFO mapreduce.Job: hbase?: 1 12/07/04 16:05:31 INFO mapreduce.Job: high-avaiability, 1 12/07/04 16:05:31 INFO mapreduce.Job: high-level 1 12/07/04 16:05:31 INFO mapreduce.Job: high-performance 1 12/07/04 16:05:31 INFO mapreduce.Job: high-throughput 1 12/07/04 16:05:31 INFO mapreduce.Job: highly-availabile 1 12/07/04 16:05:31 INFO mapreduce.Job: hive?: 1 12/07/04 16:05:31 INFO mapreduce.Job: hoc 1 12/07/04 16:05:31 INFO mapreduce.Job: include: 1 12/07/04 16:05:31 INFO mapreduce.Job: includes 1 12/07/04 16:05:31 INFO mapreduce.Job: infrastructure 1 12/07/04 16:05:31 INFO mapreduce.Job: is 4 12/07/04 16:05:31 INFO mapreduce.Job: it 1 12/07/04 16:05:31 INFO mapreduce.Job: itself 1 12/07/04 16:05:31 INFO mapreduce.Job: language 1 12/07/04 16:05:31 INFO mapreduce.Job: large 4 12/07/04 16:05:31 INFO mapreduce.Job: layer, 1 12/07/04 16:05:31 INFO mapreduce.Job: learning 1 12/07/04 16:05:31 INFO mapreduce.Job: library 2 12/07/04 16:05:31 INFO mapreduce.Job: library. 1 12/07/04 16:05:31 INFO mapreduce.Job: local 1 12/07/04 16:05:31 INFO mapreduce.Job: machine 1 12/07/04 16:05:31 INFO mapreduce.Job: machines, 1 12/07/04 16:05:31 INFO mapreduce.Job: mahout?: 1 12/07/04 16:05:31 INFO mapreduce.Job: managing 1 12/07/04 16:05:31 INFO mapreduce.Job: mapreduce: 1 12/07/04 16:05:31 INFO mapreduce.Job: may 1 12/07/04 16:05:31 INFO mapreduce.Job: mining 1 12/07/04 16:05:31 INFO mapreduce.Job: model. 1 12/07/04 16:05:31 INFO mapreduce.Job: multi-master 1 12/07/04 16:05:31 INFO mapreduce.Job: no 1 12/07/04 16:05:31 INFO mapreduce.Job: of 8 12/07/04 16:05:31 INFO mapreduce.Job: offering 1 12/07/04 16:05:31 INFO mapreduce.Job: on 3 12/07/04 16:05:31 INFO mapreduce.Job: open-source 1 12/07/04 16:05:31 INFO mapreduce.Job: other 2 12/07/04 16:05:31 INFO mapreduce.Job: parallel 1 12/07/04 16:05:31 INFO mapreduce.Job: pig?: 1 12/07/04 16:05:31 INFO mapreduce.Job: points 1 12/07/04 16:05:31 INFO mapreduce.Job: processing 2 12/07/04 16:05:31 INFO mapreduce.Job: programming 1 12/07/04 16:05:31 INFO mapreduce.Job: project 2 12/07/04 16:05:31 INFO mapreduce.Job: projects 1 12/07/04 16:05:31 INFO mapreduce.Job: prone 1 12/07/04 16:05:31 INFO mapreduce.Job: provides 2 12/07/04 16:05:31 INFO mapreduce.Job: querying. 1 12/07/04 16:05:31 INFO mapreduce.Job: rather 1 12/07/04 16:05:31 INFO mapreduce.Job: reliable, 1 12/07/04 16:05:31 INFO mapreduce.Job: rely 1 12/07/04 16:05:31 INFO mapreduce.Job: scalable 2 12/07/04 16:05:31 INFO mapreduce.Job: scalable, 2 12/07/04 16:05:31 INFO mapreduce.Job: scale 1 12/07/04 16:05:31 INFO mapreduce.Job: serialization 1 12/07/04 16:05:31 INFO mapreduce.Job: servers 1 12/07/04 16:05:31 INFO mapreduce.Job: service 2 12/07/04 16:05:31 INFO mapreduce.Job: sets 2 12/07/04 16:05:31 INFO mapreduce.Job: simple 1 12/07/04 16:05:31 INFO mapreduce.Job: single 2 12/07/04 16:05:31 INFO mapreduce.Job: so 1 12/07/04 16:05:31 INFO mapreduce.Job: software 3 12/07/04 16:05:31 INFO mapreduce.Job: storage 1 12/07/04 16:05:31 INFO mapreduce.Job: storage. 1 12/07/04 16:05:31 INFO mapreduce.Job: structured 1 12/07/04 16:05:31 INFO mapreduce.Job: subprojects. 1 12/07/04 16:05:31 INFO mapreduce.Job: subprojects: 1 12/07/04 16:05:31 INFO mapreduce.Job: summarization 1 12/07/04 16:05:31 INFO mapreduce.Job: support 1 12/07/04 16:05:31 INFO mapreduce.Job: supports 1 12/07/04 16:05:31 INFO mapreduce.Job: system 3 12/07/04 16:05:31 INFO mapreduce.Job: system. 1 12/07/04 16:05:31 INFO mapreduce.Job: systems. 1 12/07/04 16:05:31 INFO mapreduce.Job: tables. 1 12/07/04 16:05:31 INFO mapreduce.Job: than 1 12/07/04 16:05:31 INFO mapreduce.Job: that 5 12/07/04 16:05:31 INFO mapreduce.Job: the 8 12/07/04 16:05:31 INFO mapreduce.Job: these 1 12/07/04 16:05:31 INFO mapreduce.Job: thousands 1 12/07/04 16:05:31 INFO mapreduce.Job: to 6 12/07/04 16:05:31 INFO mapreduce.Job: top 1 12/07/04 16:05:31 INFO mapreduce.Job: up 1 12/07/04 16:05:31 INFO mapreduce.Job: using 1 12/07/04 16:05:31 INFO mapreduce.Job: utilities 1 12/07/04 16:05:31 INFO mapreduce.Job: warehouse 1 12/07/04 16:05:31 INFO mapreduce.Job: what 1 12/07/04 16:05:31 INFO mapreduce.Job: which 1 12/07/04 16:05:31 INFO mapreduce.Job: with 1 12/07/04 16:05:31 INFO mapreduce.Job: zookeeper?: 1
p.s.
maven dependency
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.6.5</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.6.5</version></dependency>