JAVA HADOOP MAPREDUCE

Modify the WordCount program so it outputs the wordcount for each distinct word in each file. So the output of this DocWordCount program should be of the form ‘word#####filename count’, where ‘#####’ serves as a delimiter between word and filename and tab serves as a delimiter between filename and count. Submit your source code in a file named DocWordCount.java.

Explanation: Consider two simple files file1.txt and file2.txt. $ echo "Hadoop is yellow Hadoop" > file1.txt $ echo "yellow Hadoop is an elephant" > file2.txt Running ‘DocWordCount.java’ on these two files will give an output similar to that below, where ##### is a delimiter.

Output of DocWordCount.java

yellow#####file2.txt 1

Hadoop#####file2.txt 1

is#####file2.txt 1

elephant#####file2.txt 1

yellow#####file1.txt 1

Hadoop#####file1.txt 2

is#####file1.txt 1

an#####file2.txt 1

Initial code that needs to be modified:

package org.myorg;

import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;


public class WordCount extends Configured implements Tool {

private static final Logger LOG = Logger .getLogger( WordCount.class);

public static void main( String[] args) throws Exception {
int res = ToolRunner .run( new WordCount(), args);
System .exit(res);
}

public int run( String[] args) throws Exception {
Job job = Job .getInstance(getConf(), " wordcount ");
job.setJarByClass( this .getClass());

FileInputFormat.addInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[ 1]));
job.setMapperClass( Map .class);
job.setReducerClass( Reduce .class);
job.setOutputKeyClass( Text .class);
job.setOutputValueClass( IntWritable .class);

return job.waitForCompletion( true) ? 0 : 1;
}

public static class Map extends Mapper {
private final static IntWritable one = new IntWritable( 1);
private Text word = new Text();

private static final Pattern WORD_BOUNDARY = Pattern .compile("\\s*\\b\\s*");

public void map( LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {

String line = lineText.toString();
Text currentWord = new Text();

for ( String word : WORD_BOUNDARY .split(line)) {
if (word.isEmpty()) {
continue;
}
currentWord = new Text(word);
context.write(currentWord,one);
}
}
}

public static class Reduce extends Reducer {
@Override
public void reduce( Text word, Iterable counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for ( IntWritable count : counts) {
sum += count.get();
}
context.write(word, new IntWritable(sum));
}
}
}

Respuesta :

Answer:

Explanation:

package PackageDemo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static void main(String [] args) throws Exception

{

Configuration c=new Configuration();

String[] files=new GenericOptionsParser(c,args).getRemainingArgs();

Path input=new Path(files[0]);

Path output=new Path(files[1]);

Job j=new Job(c,"wordcount");

j.setJarByClass(WordCount.class);

j.setMapperClass(MapForWordCount.class);

j.setReducerClass(ReduceForWordCount.class);

j.setOutputKeyClass(Text.class);

j.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(j, input);

FileOutputFormat.setOutputPath(j, output);

System.exit(j.waitForCompletion(true)?0:1);

}

public static class MapForWordCount extends Mapper<LongWritable, Text, Text, IntWritable>{

public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException

{

String line = value.toString();

String[] words=line.split(",");

for(String word: words )

{

Text outputKey = new Text(word.toUpperCase().trim());

IntWritable outputValue = new IntWritable(1);

con.write(outputKey, outputValue);

}

}

}

public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text, IntWritable>

{

public void reduce(Text word, Iterable<IntWritable> values, Context con) throws IOException, InterruptedException

{

int sum = 0;

for(IntWritable value : values)

{

sum += value.get();

}

con.write(word, new IntWritable(sum));

}

}

}