Published on

HBase Mapreduce advanced feature

Authors
  • Name
    Twitter

Background

아래처럼, <salt>:<timestamp>:<data> 가 있는 row_key에서 특정 timestamp 사이의 row key 만을 추려서 새로운 table로 만들고 싶을 때, 조회하고자 하는 key가 row_key 중간에 있어 full scan을 할 수밖에 없다. full scan을 해야한다면, 너무 오래걸리기 때문에 mapreduce Job을 작성해 원하는 row key로 새로운 table을 생성할 수 있다.

hbase:004:0> scan 'testtable'
ROW                                                           COLUMN+CELL
 15g:112:a                                                    column=cf:, timestamp=2023-06-11T01:01:03.539, value=value1
 45g:111:a                                                    column=cf:, timestamp=2023-06-11T01:00:56.308, value=value1
 55g:114:a                                                    column=cf:, timestamp=2023-06-11T01:01:36.731, value=value1
 8xg:123:a                                                    column=cf:, timestamp=2023-06-11T01:00:04.247, value=value1
 95g:113:a                                                    column=cf:, timestamp=2023-06-11T01:01:27.374, value=value1
 a5g:124:a                                                    column=cf:, timestamp=2023-06-11T01:00:17.300, value=value1
 g5g:126:a                                                    column=cf:, timestamp=2023-06-11T01:00:34.410, value=value1
 k5g:127:a                                                    column=cf:, timestamp=2023-06-11T01:00:44.973, value=value1
 z5g:125:a                                                    column=cf:, timestamp=2023-06-11T01:00:24.036, value=value1
9 row(s)
Took 0.0372 seconds

Mapreduce code

Driver

public class RowFilteringJob {
    private static final String zookeeper_quorum = "latte01,latte02,latte03";
    private static final String zookeeper_port = "2181";

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // HBase 관련 config 등록
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", zookeeper_quorum);
        config.set("hbase.zookeeper.property.clientPort", zookeeper_port);
        // Job 생성
        Job job = new Job(config, "RowFilteringJob");
        job.setJarByClass(RowFilteringJob.class);
        Scan scan = new Scan();
        // Full Scan을 할 때, scan 객체를 아래처럼 설정해주어야 한다.
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        // Table Mapper 등록
        TableMapReduceUtil.initTableMapperJob(
                "testtable", // table 명
                scan,
                RowFilteringMapper.class, // Mapper Class 등록
                Text.class,
                IntWritable.class,
                job
        );
        TableMapReduceUtil.initTableReducerJob(
                "output_testtable",    // Output table name
                RowFilteringReducer.class,
                job
        );

        job.setNumReduceTasks(1);
        // job 제출
        boolean b = job.waitForCompletion(true);
        if(!b){
            throw new IOException("error with job");
        }
    }
}

Mapper

public class RowFilteringMapper extends TableMapper<Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);
    private Text textKey = new Text();
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String rowKey = Bytes.toString(key.get());
        String columnValue = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("")));
        // parsing row key
        String[] rowkey_parts = rowKey.split(":");
        if( "123".compareTo(rowkey_parts[1]) <=0 && rowkey_parts[1].compareTo("125")<=0){
            // Set the output key-value pair
            textKey.set(rowKey);
            // Emit the output key-value pair
            context.write(textKey, ONE);
        }
    }

Reducer

public class RowFilteringReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
        int sum = 0;

        // Iterate through the values and calculate the sum
        for (IntWritable value : values) {
            sum += value.get();
        }
        // Create a Put object with the result sum
        Put put = new Put(Bytes.toBytes(key.toString()));
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("result"), Bytes.toBytes(sum));

        // Write the Put object to the context
        context.write(null, put);
    }
}

result

아래처럼, 123, 124, 125 timestamp만 포함된 Table을 생성할 수 있다.

hbase:005:0> scan 'output_testtable'
ROW                                                           COLUMN+CELL
 8xg:123:a                                                    column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
 a5g:124:a                                                    column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
 z5g:125:a

output table은 멱등성이 없음.