javaeclipsehadoopmapreducecloudera

Mapreduce with 2 keys


I am just learning the map-reduce job. I have done one thing with my assignment and I have to change my code to accept another text file as an input and the output has to show location with year with max, min, and average of amount. This is the example of one line of my input: Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641

and the output should be something like: Calgary 2009 Average is: Max: Min:

and here is my code which gives the txt file and calculate avg, min, and max:

public class AverageMinMax {
    
    
    
    public static class Map extends Mapper<LongWritable,Date,Text,Text> {
        
        
        //private static final FloatWritable rep= new  FloatWritable(1);
            public void map(LongWritable key,Text value,Context context)
            throws IOException, InterruptedException {
                    context.write(new Text("Map_Output"), value);
            };
        }
          public static class Combiner extends Reducer<Text,Text,Text,Text>
          {
          public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
              {
                 Integer NumberOfValues=0;
                 double sum=0D;
                 double min=0D;
                 double max=0D;
                 //double min=values.get(0);
                  Iterator<Text> itr = values.iterator();
                  //convertString=values(0);
                  while(itr.hasNext())
                  {
                      String TexttoString = itr.next().toString();
                      Double value = Double.parseDouble(TexttoString);
                      if(value<min)
                      {
                          min=value;
                      }
                      if(value>max)
                      {
                          max=value;
                      }
                      NumberOfValues++;
                      sum+=value;
                  }
                   Double average = sum/NumberOfValues;
                    context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
              };
          }
     public static class Reduce extends
           Reducer<Text,Text,Text,Text> {
          public void reduce(Text key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
               Integer totalNumberOfValues= 0;
              Double sum=0.00;
              Double min=0D;
              Double max=0D;
              Iterator<Text> itr = values.iterator();
                while(itr.hasNext())
              {
                  String TexttoString = itr.next().toString();
                  String[] split_String = TexttoString.split(",");
                  Double average = Double.parseDouble(split_String[0]);
                  Integer NumberOfValues = Integer.parseInt(split_String[1]);
                  Double minValue=Double.parseDouble(split_String[2]);
                  Double maxValue=Double.parseDouble(split_String[3]);
                  if(minValue<min)
                  {
                      min=minValue;
                  }
                  if(maxValue>max)
                  {
                      max=maxValue;
                  }
                  sum+=(average*NumberOfValues);
                  totalNumberOfValues+=NumberOfValues;   
              } 
              Double average= sum/totalNumberOfValues;
              context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
              };
         }
         public static void main(String[] args) throws Exception {
       
             Configuration conf = new Configuration();
             Job job=new Job(conf,"AverageMinMax.class");
             job.setJarByClass(AverageMinMax.class);
             job.setJobName("MapReduceAssignment");
             //JobConf conf = new JobConf(Hadoop_map_reduce.class);

            //conf.setJobName("Hadoop_assignment");
             // Configuration conf = new Configuration();
          //Job job = new Job(conf, "maxmin");
          //job.setJarByClass(Hadoop_map_reduce.class);
         // FileSystem fs = FileSystem.get(conf);
        /*  if (fs.exists(new Path(args[1]))) {
           fs.delete(new Path(args[1]), true);
          }*/
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
             
             //job.setNumReduceTasks(1);

             job.setMapperClass(Map.class);
        
            job.setReducerClass(Reduce.class);
             job.setCombinerClass(Combiner.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

         FileInputFormat.addInputPath(job, new Path(args[0]));
        //  FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
          //  FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
          job.waitForCompletion(true);
         }
}

So, my first problem is I don't know how to convert the date in the mapper and the how to find 2 keys and show in output. I mean how to rewrite this code!

I appreciate your help


Solution

  • Your question is not entirely clear. So, my assumptions are as follows:

    1. You have a collection of data that shows a location, date, and some double values which you want to process
    2. The values you want to process start from the first double value (i.e 2.5207754, ...).
    3. You average value is the average of all the the columns for the entire observation of each year. (i.e If you have 5 samples from 2009, and each sample has 5 values, you want the average of the 25 values).
    4. Your min and max values is the min and max value from the entire observation of respective years.

    If the assumptions are correct, I suggest that you make use of Prof. Jeremy Lin's custom datatypes. A possible solution would be the following:

    1. Your key will be the location and year combined into Text.

      String line = value.toString();
      String[] tokens = line.split(",");
      String[] date = tokens[2].split("-");
      String year = date[0];
      String location = tokens[0];
      
      Text locationYear = new Text(location + " " + year);
      
    2. Your value would then be an ArrayListOfDoublesWritable which you can make use from the repo I mentioned above.

      ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable()
      for(int i = 5; i < tokens.length(); i++)
      {
        readings.add(Double.parseDouble(tokens[i]));
      }
      
    3. Then you can emit your mapper output as Text and ArrayListOfDoublesWritable.

      context.write(locationYear, readings);
      

    From here, you can manipulate your mapper outputs in your reducers with the calculations (average, min, max) by making use of the Collections method for Array List.

    I hope this helps.