I am trying to write a map reduce program that checks for common friends. I am using a custom writable (FriendPair) as key.
Given the following input
Tom Jerry,John
John Jerry,Sarah,Tom
It should output Jerry as the common friend for Tom and John
[John,Tom] Jerry
[John,Sarah]
[John,Jerry]
[Tom,Jerry]
Instead the map reduce is outputting the following
[John,Tom]
[John,Sarah]
[John,Jerry]
[Tom,John]
[Tom,Jerry]
The keys [John,Tom] and [Tom,John] are considered unequal.
Below is the code
Custom Writable
public class FriendPair implements WritableComparable<FriendPair> {
Text friend1;
Text friend2;
public FriendPair() {
this.friend1 = new Text("");
this.friend2 = new Text("");
}
public FriendPair(Text friend1, Text friend2) {
this.friend1 = friend1;
this.friend2 = friend2;
}
public Text getFriend1() {
return friend1;
}
public void setFriend1(Text friend1) {
this.friend1 = friend1;
}
public Text getFriend2() {
return friend2;
}
public void setFriend2(Text friend2) {
this.friend2 = friend2;
}
@Override
public void write(DataOutput out) throws IOException {
friend1.write(out);
friend2.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
friend1.readFields(in);
friend2.readFields(in);
}
@Override
public int compareTo(FriendPair pair2) {
return ((friend1.compareTo(pair2.getFriend2()) == 0 && friend2.compareTo(pair2.getFriend1()) == 0)
|| (friend1.compareTo(pair2.getFriend1()) == 0 && friend2.compareTo(pair2.getFriend2()) == 0)) ? 0 : -1;
}
@Override
public boolean equals(Object o) {
FriendPair pair2 = (FriendPair) o;
return (friend1.equals(pair2.getFriend2()) && friend2.equals(pair2.getFriend1())
|| friend1.equals(pair2.getFriend1()) && friend2.equals(pair2.getFriend2()));
}
@Override
public String toString() {
return "[" + friend1 + "," + friend2 + "]";
}
@Override
public int hashCode() {
return friend1.hashCode() + friend2.hashCode();
}
}
Mapper
public class MutualFriendsMapper extends Mapper<LongWritable, Text, FriendPair, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] items = line.split("\t");
String name = items[0];
String friendsList = items[1];
String[] friends = friendsList.split(",");
for (String friend : friends) {
FriendPair fp = new FriendPair(new Text(name), new Text(friend));
FriendPair fp2 = new FriendPair(new Text(friend), new Text(name));
context.write(fp, new Text(friendsList));
}
}
}
Reducer
public class MutualFriendsReducer extends Reducer<FriendPair, Text, FriendPair, FriendArray> {
@Override
public void reduce(FriendPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
List<String> allFriends = new ArrayList<String>();
for(Text value : values) {
String[] valueArray = value.toString().split(",");
allFriends.addAll(Arrays.asList(valueArray));
}
List<Text> commonFriends = new ArrayList<Text>();
Set<String> uniqueFriendSet = new HashSet<String>(allFriends);
for(String friend : uniqueFriendSet) {
int frequency = Collections.frequency(allFriends, friend);
if(frequency > 1) {
commonFriends.add(new Text(friend));
}
}
context.write(key, new FriendArray(Text.class, commonFriends.toArray(new Text[commonFriends.size()])));
}
}
FriendArray (Output)
public class FriendArray extends ArrayWritable {
public FriendArray(Class<? extends Writable> valueClass, Writable[] values) {
super(valueClass, values);
}
public FriendArray(Class<? extends Writable> valueClass) {
super(valueClass);
}
public FriendArray() {
super(Text.class);
}
@Override
public Text[] get() {
return (Text[]) super.get();
}
@Override
public void write(DataOutput data) throws IOException {
for(Text t : get()) {
t.write(data);
}
}
@Override
public String toString() {
Text[] friendArray = Arrays.copyOf(get(), get().length, Text[].class);
String print="";
for(Text f : friendArray)
print+=f+",";
return print;
}
}
Any help would be greatly appreciated.
On the "sort" stage Hadoop doesn't operate on java objects, but only their byte representation (the output of the FriendPair.write()
method), thus it can't call FriendPair.equals()
. So in order to make Hadoop understand that keys [John,Tom] and [Tom,John] are equal, you have to make sure that their write
output is identical. One way to achieve this is to enforce the order of friends in the pair, for example sort them alphabetically (then both pairs would look [John,Tom]).