hadoopmapreducewritable

hadoop reducer not considering two equal custom writable objects as equal


I am trying to write a map reduce program that checks for common friends. I am using a custom writable (FriendPair) as key.

Given the following input

Tom Jerry,John
John Jerry,Sarah,Tom

It should output Jerry as the common friend for Tom and John

[John,Tom]    Jerry
[John,Sarah]    
[John,Jerry]
[Tom,Jerry] 

Instead the map reduce is outputting the following

[John,Tom]  
[John,Sarah]    
[John,Jerry]    
[Tom,John]  
[Tom,Jerry]

The keys [John,Tom] and [Tom,John] are considered unequal.

Below is the code

Custom Writable

    public class FriendPair implements WritableComparable<FriendPair> {
        
        Text friend1;
        Text friend2;
        
        public FriendPair() {
            this.friend1 = new Text("");
            this.friend2 = new Text("");
        }
        
        public FriendPair(Text friend1, Text friend2) {
            this.friend1 = friend1;
            this.friend2 = friend2;
        }
        
        public Text getFriend1() {
            return friend1;
        }
        public void setFriend1(Text friend1) {
            this.friend1 = friend1;
        }
        public Text getFriend2() {
            return friend2;
        }
        public void setFriend2(Text friend2) {
            this.friend2 = friend2;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            friend1.write(out);
            friend2.write(out);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            friend1.readFields(in);
            friend2.readFields(in);
        }
    
        @Override
        public int compareTo(FriendPair pair2) {
            return ((friend1.compareTo(pair2.getFriend2()) == 0 && friend2.compareTo(pair2.getFriend1()) == 0)
                   || (friend1.compareTo(pair2.getFriend1()) == 0 && friend2.compareTo(pair2.getFriend2()) == 0)) ? 0 : -1;
        }
    
        @Override
        public boolean equals(Object o) {
            FriendPair pair2 = (FriendPair) o;
            return (friend1.equals(pair2.getFriend2()) && friend2.equals(pair2.getFriend1()) 
                    || friend1.equals(pair2.getFriend1()) && friend2.equals(pair2.getFriend2()));
        }
        
        @Override
        public String toString() {
            return "[" + friend1 + "," + friend2 + "]";
        }
        
        @Override
        public int hashCode() {
            return friend1.hashCode() + friend2.hashCode();
        }
    
    }

Mapper

public class MutualFriendsMapper extends Mapper<LongWritable, Text, FriendPair, Text> {

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String[] items = line.split("\t");

        String name = items[0];
        String friendsList = items[1];
        String[] friends = friendsList.split(",");
        for (String friend : friends) {
            FriendPair fp = new FriendPair(new Text(name), new Text(friend));
            FriendPair fp2 = new FriendPair(new Text(friend), new Text(name));
            context.write(fp, new Text(friendsList));
        }
    }
}

Reducer

public class MutualFriendsReducer extends Reducer<FriendPair, Text, FriendPair, FriendArray> {

    @Override
    public void reduce(FriendPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        
        List<String> allFriends = new ArrayList<String>();
        for(Text value : values) {
            String[] valueArray = value.toString().split(",");
            allFriends.addAll(Arrays.asList(valueArray));
        }
        List<Text> commonFriends = new ArrayList<Text>();
        Set<String> uniqueFriendSet = new HashSet<String>(allFriends);
        for(String friend : uniqueFriendSet) {
            int frequency = Collections.frequency(allFriends, friend);
            if(frequency > 1) {
                commonFriends.add(new Text(friend));
            }
        }
        
        context.write(key, new FriendArray(Text.class, commonFriends.toArray(new Text[commonFriends.size()])));
    }
}

FriendArray (Output)

public class FriendArray extends ArrayWritable {

    public FriendArray(Class<? extends Writable> valueClass, Writable[] values) {
        super(valueClass, values);
    }
    
    public FriendArray(Class<? extends Writable> valueClass) {
        super(valueClass);
    }
    
    public FriendArray() {
        super(Text.class);
    }

    @Override
    public Text[] get() {
        return (Text[]) super.get();
    }
    
    @Override
    public void write(DataOutput data) throws IOException {
        for(Text t : get()) {
            t.write(data);
        }
    }
    
    @Override
    public String toString() {
        Text[] friendArray = Arrays.copyOf(get(), get().length, Text[].class);
        String print="";
        
        for(Text f : friendArray) 
            print+=f+",";
        
        return print;
    }
}

Any help would be greatly appreciated.


Solution

  • On the "sort" stage Hadoop doesn't operate on java objects, but only their byte representation (the output of the FriendPair.write() method), thus it can't call FriendPair.equals(). So in order to make Hadoop understand that keys [John,Tom] and [Tom,John] are equal, you have to make sure that their write output is identical. One way to achieve this is to enforce the order of friends in the pair, for example sort them alphabetically (then both pairs would look [John,Tom]).