I am trying to create a SetWritable in Hadoop. Here is my implementation. I have just started with MapReduce and I can't figure out how exactly I should do this. I wrote the below code but it doesn't work.
Custom Writable(which needs to be a set):
public class TextPair implements Writable {
private Text first;
public HashSet<String> valueSet = new HashSet<String>();
public TextPair() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(valueSet.size());
Iterator<String> it = valueSet.iterator();
while (it.hasNext()) {
this.first = new Text(it.next());
first.write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
Iterator<String> it = valueSet.iterator();
while (it.hasNext()) {
this.first = new Text(it.next());
first.readFields(in);
}
}
}
Mapper code:
public class TokenizerMapper extends Mapper<Object, Text, Text, TextPair> {
ArrayList<String> al = new ArrayList<String>();
TextPair tp = new TextPair();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String [] val = value.toString().substring(2,value.toString().length()).split(" ");
for(String v: val) {
tp.valueSet.add(v);
}
String [] vals = value.toString().split(" ");
for(int i=0; i<vals.length-1; i++) {
setKey(vals[0],vals[i+1]);
System.out.println(getKey());
context.write(new Text(getKey()), tp);
}
}
public void setKey(String first,String second) {
al.clear();
al.add(first);
al.add(second);
java.util.Collections.sort(al);
}
public String getKey() {
String tp = al.get(0)+al.get(1);
return tp;
}
}
I am basically trying to emit a SetWritable as value from the Mapper. Please suggest what changes I need to make. Thanks!
I would say you have problems with how you read and write. You need to know how large the Set is and use that to read the correct number of Text objects.
I changed your version to be a Set of Text objects since they can be read and written easily.
public class TextWritable implements Writable {
private Set<Text> values;
public TextPair() {
values = new HashSet<Text>();
}
@Override
public void write(DataOutput out) throws IOException {
// Write out the size of the Set
out.writeInt(valueSet.size());
// Write out each Text object
for(Text t : values) {
t.write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
// Make sure we have a HashSet to fill up
values = new HashSet<Text>();
// Get the number of elements in the set
int size = in.readInt();
// Read the correct number of Text objects
for(int i=0; i<size; i++) {
Text t = new Text();
t.readFields(in);
values.add(t);
}
}
}
You should add some helper classes to this for adding elements to the Set.
I also can't see where you clear
the Set in the map
method. If you don't clear it, it will potentially keep growing larger and larger each time the map method is called.
See the Hadoop ArrayWritable for a reference.