- Published on
HBase Mapreduce advanced feature
- Authors
- Name
Background
아래처럼, <salt>:<timestamp>:<data>
가 있는 row_key에서 특정 timestamp
사이의 row key 만을 추려서 새로운 table로 만들고 싶을 때, 조회하고자 하는 key가 row_key 중간에 있어 full scan을 할 수밖에 없다. full scan을 해야한다면, 너무 오래걸리기 때문에 mapreduce Job을 작성해 원하는 row key로 새로운 table을 생성할 수 있다.
hbase:004:0> scan 'testtable'
ROW COLUMN+CELL
15g:112:a column=cf:, timestamp=2023-06-11T01:01:03.539, value=value1
45g:111:a column=cf:, timestamp=2023-06-11T01:00:56.308, value=value1
55g:114:a column=cf:, timestamp=2023-06-11T01:01:36.731, value=value1
8xg:123:a column=cf:, timestamp=2023-06-11T01:00:04.247, value=value1
95g:113:a column=cf:, timestamp=2023-06-11T01:01:27.374, value=value1
a5g:124:a column=cf:, timestamp=2023-06-11T01:00:17.300, value=value1
g5g:126:a column=cf:, timestamp=2023-06-11T01:00:34.410, value=value1
k5g:127:a column=cf:, timestamp=2023-06-11T01:00:44.973, value=value1
z5g:125:a column=cf:, timestamp=2023-06-11T01:00:24.036, value=value1
9 row(s)
Took 0.0372 seconds
Mapreduce code
Driver
public class RowFilteringJob {
private static final String zookeeper_quorum = "latte01,latte02,latte03";
private static final String zookeeper_port = "2181";
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// HBase 관련 config 등록
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", zookeeper_quorum);
config.set("hbase.zookeeper.property.clientPort", zookeeper_port);
// Job 생성
Job job = new Job(config, "RowFilteringJob");
job.setJarByClass(RowFilteringJob.class);
Scan scan = new Scan();
// Full Scan을 할 때, scan 객체를 아래처럼 설정해주어야 한다.
scan.setCaching(500);
scan.setCacheBlocks(false);
// Table Mapper 등록
TableMapReduceUtil.initTableMapperJob(
"testtable", // table 명
scan,
RowFilteringMapper.class, // Mapper Class 등록
Text.class,
IntWritable.class,
job
);
TableMapReduceUtil.initTableReducerJob(
"output_testtable", // Output table name
RowFilteringReducer.class,
job
);
job.setNumReduceTasks(1);
// job 제출
boolean b = job.waitForCompletion(true);
if(!b){
throw new IOException("error with job");
}
}
}
Mapper
public class RowFilteringMapper extends TableMapper<Text, IntWritable> {
private final static IntWritable ONE = new IntWritable(1);
private Text textKey = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String rowKey = Bytes.toString(key.get());
String columnValue = Bytes.toString(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("")));
// parsing row key
String[] rowkey_parts = rowKey.split(":");
if( "123".compareTo(rowkey_parts[1]) <=0 && rowkey_parts[1].compareTo("125")<=0){
// Set the output key-value pair
textKey.set(rowKey);
// Emit the output key-value pair
context.write(textKey, ONE);
}
}
Reducer
public class RowFilteringReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException {
int sum = 0;
// Iterate through the values and calculate the sum
for (IntWritable value : values) {
sum += value.get();
}
// Create a Put object with the result sum
Put put = new Put(Bytes.toBytes(key.toString()));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("result"), Bytes.toBytes(sum));
// Write the Put object to the context
context.write(null, put);
}
}
result
아래처럼, 123, 124, 125 timestamp만 포함된 Table을 생성할 수 있다.
hbase:005:0> scan 'output_testtable'
ROW COLUMN+CELL
8xg:123:a column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
a5g:124:a column=cf:result, timestamp=2023-06-11T01:33:07.352, value=\x00\x00\x00\x01
z5g:125:a
output table은 멱등성이 없음.