javahadoopmapreduceshingles

hadoop mapreduce to generate substrings of different lengths


Using Hadoop mapreduce I am writing code to get substrings of different lengths. Example given string "ZYXCBA" and length 3 (Using a text file i give input as "3 ZYXCBA"). My code has to return all possible strings of length 3 ("ZYX","YXC","XCB","CBA"), length 4("ZYXC","YXCB","XCBA") finally length 5("ZYXCB","YXCBA").

In map phase I did the following:

key = length of substrings I want

value = "ZYXCBA".

So mapper output is

3,"ZYXCBA"
4,"ZYXCBA"
5,"ZYXCBA"

In reduce I take string ("ZYXCBA") and key 3 to get all substrings of length 3. Same occurs for 4,5. Results are concatenated using a string. So out put of reduce should be :

3 "ZYX YXC XCB CBA"
4 "ZYXC YXCB XCBA"
5 "ZYXCB YXCBA" 

I am running my code using following command:

hduser@Ganesh:~/Documents$ hadoop jar Saishingles.jar hadoopshingles.Saishingles Behara/Shingles/input Behara/Shingles/output

My code is as shown below:

package hadoopshingles;

import java.io.IOException;
//import java.util.ArrayList;

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class Saishingles{

public static class shinglesmapper extends Mapper<Object, Text, IntWritable, Text>{

        public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {

            String str = new String(value.toString());
            String[] list = str.split(" ");
            int x = Integer.parseInt(list[0]);
            String val = list[1];
            int M = val.length();
            int X = M-1;


            for(int z = x; z <= X; z++)
            {
                context.write(new IntWritable(z), new Text(val));
            }

        }

     }


public static class shinglesreducer extends Reducer<IntWritable,Text,IntWritable,Text> {


    public void reduce(IntWritable key, Text value, Context context
            ) throws IOException, InterruptedException {
        int z = key.get();
        String str = new String(value.toString());
        int M = str.length();
        int Tz = M - z;
        String newvalue = "";
        for(int position = 0; position <= Tz; position++)
        {
            newvalue = newvalue + " " + str.substring(position,position + z);   
        }

        context.write(new IntWritable(z),new Text(newvalue));
    }
}




public static void main(String[] args) throws Exception {
      GenericOptionsParser parser = new GenericOptionsParser(args);
      Configuration conf = parser.getConfiguration();
      String[] otherArgs = parser.getRemainingArgs();

        if (otherArgs.length != 2) 
        {
          System.err.println("Usage: Saishingles <inputFile> <outputDir>");
          System.exit(2);
        }
      Job job = Job.getInstance(conf, "Saishingles");
      job.setJarByClass(hadoopshingles.Saishingles.class);
      job.setMapperClass(shinglesmapper.class);
      //job.setCombinerClass(shinglesreducer.class);
      job.setReducerClass(shinglesreducer.class);
      //job.setMapOutputKeyClass(IntWritable.class);
      //job.setMapOutputValueClass(Text.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(Text.class);
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

Output of reduce instead of returning

3 "ZYX YXC XCB CBA"
4 "ZYXC YXCB XCBA"
5 "ZYXCB YXCBA" 

it's returning

3 "ZYXCBA"
4 "ZYXCBA"
5 "ZYXCBA"

i.e., it's giving same output as mapper. Don't know why this is happening. Please help me resolve this and thanks in advance for helping ;) :) :)


Solution

  • You can achieve this without even running reducer. your map/reduce logic is wrong...transformation should be done in Mapper.

    Reduce - In this phase the reduce(WritableComparable, Iterator, OutputCollector, Reporter) method is called for each <key, (list of values)> pair in the grouped inputs.

    in your reduce signature: public void reduce(IntWritable key, Text value, Context context)

    should be public void reduce(IntWritable key, Iterable<Text> values, Context context)

    Also, change last line of reduce method: context.write(new IntWritable(z),new Text(newvalue)); to context.write(key,new Text(newvalue)); - you already have Intwritable Key from mapper, I wouldn't create new one.

    with given input:

    3 "ZYXCBA"
    4 "ZYXCBA"
    5 "ZYXCBA"
    

    Mapper job will output:

    3   "XCB YXC ZYX"
    4   "XCBA YXCB ZYXC"
    5   "YXCBA ZYXCB"
    

    MapReduceJob:

    import java.io.IOException;
    import java.util.ArrayList;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SubStrings{
    
        public static class SubStringsMapper extends Mapper<Object, Text, IntWritable, Text> {
    
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    
                String [] values = value.toString().split(" ");
                int len = Integer.parseInt(values[0].trim());
                String str = values[1].replaceAll("\"", "").trim();
    
                int endindex=len;
                for(int i = 0; i < len; i++)
                {
                    endindex=i+len;
                    if(endindex <= str.length())
                        context.write(new IntWritable(len), new Text(str.substring(i, endindex))); 
                }
    
            }   
        }
    
        public  static class SubStringsReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
    
            public void reduce(IntWritable key, Iterable<Text> values, Context context) 
                    throws IOException, InterruptedException {
    
                String str="\""; //adding starting quotes
                for(Text value: values)
                    str += " " + value;
    
                str=str.replace("\" ", "\"") + "\""; //adding ending quotes
                context.write(key, new Text(str));
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "get-possible-strings-by-length");
    
            job.setJarByClass(SubStrings.class);
            job.setMapperClass(SubStringsMapper.class); 
            job.setReducerClass(SubStringsReducer.class);
    
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            FileSystem fs = null;
            Path dstFilePath = new Path(args[1]);
            try {
                fs = dstFilePath.getFileSystem(conf);
                if (fs.exists(dstFilePath))
                    fs.delete(dstFilePath, true);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
    
            job.waitForCompletion(true);
        } 
    }