I am just learning the map-reduce job. I have done one thing with my assignment and I have to change my code to accept another text file as an input and the output has to show location with year with max, min, and average of amount. This is the example of one line of my input:
Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641
and the output should be something like:
Calgary 2009 Average is: Max: Min:
and here is my code which gives the txt file and calculate avg, min, and max:
public class AverageMinMax {
public static class Map extends Mapper<LongWritable,Date,Text,Text> {
//private static final FloatWritable rep= new FloatWritable(1);
public void map(LongWritable key,Text value,Context context)
throws IOException, InterruptedException {
context.write(new Text("Map_Output"), value);
};
}
public static class Combiner extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
{
Integer NumberOfValues=0;
double sum=0D;
double min=0D;
double max=0D;
//double min=values.get(0);
Iterator<Text> itr = values.iterator();
//convertString=values(0);
while(itr.hasNext())
{
String TexttoString = itr.next().toString();
Double value = Double.parseDouble(TexttoString);
if(value<min)
{
min=value;
}
if(value>max)
{
max=value;
}
NumberOfValues++;
sum+=value;
}
Double average = sum/NumberOfValues;
context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
};
}
public static class Reduce extends
Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
Integer totalNumberOfValues= 0;
Double sum=0.00;
Double min=0D;
Double max=0D;
Iterator<Text> itr = values.iterator();
while(itr.hasNext())
{
String TexttoString = itr.next().toString();
String[] split_String = TexttoString.split(",");
Double average = Double.parseDouble(split_String[0]);
Integer NumberOfValues = Integer.parseInt(split_String[1]);
Double minValue=Double.parseDouble(split_String[2]);
Double maxValue=Double.parseDouble(split_String[3]);
if(minValue<min)
{
min=minValue;
}
if(maxValue>max)
{
max=maxValue;
}
sum+=(average*NumberOfValues);
totalNumberOfValues+=NumberOfValues;
}
Double average= sum/totalNumberOfValues;
context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
};
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job=new Job(conf,"AverageMinMax.class");
job.setJarByClass(AverageMinMax.class);
job.setJobName("MapReduceAssignment");
//JobConf conf = new JobConf(Hadoop_map_reduce.class);
//conf.setJobName("Hadoop_assignment");
// Configuration conf = new Configuration();
//Job job = new Job(conf, "maxmin");
//job.setJarByClass(Hadoop_map_reduce.class);
// FileSystem fs = FileSystem.get(conf);
/* if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setNumReduceTasks(1);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Combiner.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
//FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
job.waitForCompletion(true);
}
}
So, my first problem is I don't know how to convert the date in the mapper and the how to find 2 keys and show in output. I mean how to rewrite this code!
I appreciate your help
Your question is not entirely clear. So, my assumptions are as follows:
If the assumptions are correct, I suggest that you make use of Prof. Jeremy Lin's custom datatypes. A possible solution would be the following:
Your key will be the location and year combined into Text.
String line = value.toString();
String[] tokens = line.split(",");
String[] date = tokens[2].split("-");
String year = date[0];
String location = tokens[0];
Text locationYear = new Text(location + " " + year);
Your value would then be an ArrayListOfDoublesWritable which you can make use from the repo I mentioned above.
ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable()
for(int i = 5; i < tokens.length(); i++)
{
readings.add(Double.parseDouble(tokens[i]));
}
Then you can emit your mapper output as Text and ArrayListOfDoublesWritable.
context.write(locationYear, readings);
From here, you can manipulate your mapper outputs in your reducers with the calculations (average, min, max) by making use of the Collections method for Array List.
I hope this helps.