Published on

Hadoop MapReduce WordCount 실행하기.

Authors
  • Name
    Twitter

Overview

Hadoop을 설치하면, HDFS 외에도 YARN(ResourceManager, NodeManager)를 함께 설치하게 된다. MapReduce(Mapreduce-Paper) 프레임워크는 HDFS에 저장된 데이터에 대한 연산을 수행하는 아주 멋진 방법이고, 우리는 Mapper와 Reducer 만 작성해서 Job을 제출하면 나머지는 YARN에서 나머지를 전부 관리해준다. Word Count 예제는 문서에 있는 각각의 단어의 갯수를 세는 예제인데, MapReduce를 학습하기에 좋은 예제이다. 공식사이트의 wordcount-tutorial 이 있는데 한국어 버젼은 없어서 이번 글에서 소개해보려한다.

environment variable 준비

아래와 같이 JAVA_HOME/bin 을 path로 등록해주고, HADOOP_CLASSPATH도 $JAVA_HOME/lib/tools.jar로 설정해줍니다. jre의 경우 lib/tools.jar 가 없을 수 있으므로, JAVA_HOME을 설정하실 때는 jdk로 설정해야합니다.

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

WordCount.java 컴파일

WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

위에 작성한 WordCount.java 파일을 hadoop com.sun.tools.javac.Main WordCount.java 명령어로 컴파일 하고, jar로 합친다.

root@ubuntu01:~# hadoop com.sun.tools.javac.Main WordCount.java
root@ubuntu01:~# ls
    'WordCount$IntSumReducer.class'  'WordCount$TokenizerMapper.class'   WordCount.class   WordCount.java
root@ubuntu01:~# jar cf wc.jar WordCount*.class
root@ubuntu01:~# ls
    wc.jar  'WordCount$IntSumReducer.class'  'WordCount$TokenizerMapper.class'   WordCount.class   WordCount.java

text 준비

word count의 대상이 될 텍스트 파일을 하나 준비합니다. 저는 공식 홈페이지에 있는 글 하나를 준비했습니다.

sample.txt
cat sample.txt
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.

이후 아래의 명령어를 통해 input 디렉터리를 생성하고 로컬의 파일을 hdfs상으로 옮겨줍니다.

hdfs dfs -mkdir /tmp/input
hdfs dfs -put sample.txt /tmp/input

위에서 생성한 wc.jar 로 HDFS상에 올린 sample.txt 에 사용된 단어의 수를 Count 할 준비가 모두 되었습니다.

WordCount 실행

hadoop jar wc.jar WordCount /tmp/input /tmp/output 명령어로 실행을 하면 JOB이 제출이 됩니다.

root@ubuntu01:~# hadoop jar wc.jar WordCount /tmp/input /tmp/output
2022-11-20 14:57:56,435 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ubuntu01/192.168.219.101:8040
2022-11-20 14:57:56,648 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2022-11-20 14:57:56,675 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1668956245532_0001
2022-11-20 14:57:56,827 INFO input.FileInputFormat: Total input files to process : 1
2022-11-20 14:57:56,902 INFO mapreduce.JobSubmitter: number of splits:1
2022-11-20 14:57:57,010 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1668956245532_0001
2022-11-20 14:57:57,011 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-11-20 14:57:57,134 INFO conf.Configuration: resource-types.xml not found
2022-11-20 14:57:57,134 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-11-20 14:57:57,456 INFO impl.YarnClientImpl: Submitted application application_1668956245532_0001
2022-11-20 14:57:57,483 INFO mapreduce.Job: The url to track the job: http://ubuntu01:8088/proxy/application_1668956245532_0001/
2022-11-20 14:57:57,484 INFO mapreduce.Job: Running job: job_1668956245532_0001
2022-11-20 14:58:02,539 INFO mapreduce.Job: Job job_1668956245532_0001 running in uber mode : false
2022-11-20 14:58:02,539 INFO mapreduce.Job:  map 0% reduce 0%
2022-11-20 14:58:07,605 INFO mapreduce.Job:  map 100% reduce 0%
2022-11-20 14:58:11,634 INFO mapreduce.Job:  map 100% reduce 100%
2022-11-20 14:58:12,652 INFO mapreduce.Job: Job job_1668956245532_0001 completed successfully
2022-11-20 14:58:12,716 INFO mapreduce.Job: Counters: 54
	File System Counters
		FILE: Number of bytes read=1358
		FILE: Number of bytes written=554613
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1142
		HDFS: Number of bytes written=949
		HDFS: Number of read operations=8
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=2072
		Total time spent by all reduces in occupied slots (ms)=2033
		Total time spent by all map tasks (ms)=2072
		Total time spent by all reduce tasks (ms)=2033
		Total vcore-milliseconds taken by all map tasks=2072
		Total vcore-milliseconds taken by all reduce tasks=2033
		Total megabyte-milliseconds taken by all map tasks=2121728
		Total megabyte-milliseconds taken by all reduce tasks=2081792
	Map-Reduce Framework
		Map input records=5
		Map output records=161
		Map output bytes=1678
		Map output materialized bytes=1358
		Input split bytes=106
		Combine input records=161
		Combine output records=101
		Reduce input groups=101
		Reduce shuffle bytes=1358
		Reduce input records=101
		Reduce output records=101
		Spilled Records=202
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=77
		CPU time spent (ms)=790
		Physical memory (bytes) snapshot=583352320
		Virtual memory (bytes) snapshot=5094989824
		Total committed heap usage (bytes)=501219328
		Peak Map Physical memory (bytes)=337264640
		Peak Map Virtual memory (bytes)=2544484352
		Peak Reduce Physical memory (bytes)=246087680
		Peak Reduce Virtual memory (bytes)=2550505472
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=1036
	File Output Format Counters
		Bytes Written=949
root@ubuntu01:~# hdfs dfs -cat /tmp/output/part-r-00000
(multi-terabyte	1
(see	1
(thousands	1
A	1
Architecture	1
Distributed	1
File	1
Guide)	1
HDFS	1
Hadoop	2
MapReduce	3
System	1
The	2
This	1