org.apache.hadoop.hbase.mapreduce
TableMapper TableReducer
一个region对应一个map
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;public class HbaseMR { public class MyMapper extends TableMapper{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { // key代表rowkey Text k = new Text(Bytes.toString(key.get())); Text v = new Text(Bytes.toString(value.getValue( "basicinfo".getBytes(), "age".getBytes()))); context.write(v, k); } } public class MyReducer extends TableReducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); for (Text value : values) { put.add(Bytes.toBytes("f1"), Bytes.toBytes(value.toString()), Bytes.toBytes(value.toString())); } context.write(null, put); } } public static void main(String[] args) { Configuration conf= HBaseConfiguration.create(); try { Job job=new Job(conf, "mapreduce on hbase"); job.setJarByClass(HbaseMR.class); Scan scan=new Scan(); scan.setCaching(1000);// TableMapReduceUtil.initTableMapperJob("students", scan, MyMapper.class, Text.class, Text.class, job); TableMapReduceUtil.initTableReducerJob("student-age", MyReducer.class, job); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } }}