I tried to distribute a calculation using hadoop.
I am using Sequence input and output files, and custom Writables.
The input is a list of triangles, maximum size 2Mb, but can be smaller around 50kb too. The intermediate values and the output is a map(int,double) in the custom Writable. Is this the bottleneck?
The problem is that the calculation is much slower than the version without hadoop. also, increasing the nodes from 2 to 10, doesn't speed up the process.
One possibility is that I don't get enough mappers because of the small input size.
I made tests changing the mapreduce.input.fileinputformat.split.maxsize
, but it just got worse, not better.
I am using hadoop 2.2.0 locally, and at amazon elastic mapreduce.
Did I overlook something? Or this is just the kind of task which should be done without hadoop? (it's my first time using mapreduce).
Would you like to see code parts?
Thank you.
public void map(IntWritable triangleIndex, TriangleWritable triangle, Context context) throws IOException, InterruptedException {
StationWritable[] stations = kernel.newton(triangle.getPoints());
if (stations != null) {
for (StationWritable station : stations) {
context.write(new IntWritable(station.getId()), station);
}
}
}
class TriangleWritable implements Writable {
private final float[] points = new float[9];
@Override
public void write(DataOutput d) throws IOException {
for (int i = 0; i < 9; i++) {
d.writeFloat(points[i]);
}
}
@Override
public void readFields(DataInput di) throws IOException {
for (int i = 0; i < 9; i++) {
points[i] = di.readFloat();
}
}
}
public class StationWritable implements Writable {
private int id;
private final TIntDoubleHashMap values = new TIntDoubleHashMap();
StationWritable(int iz) {
this.id = iz;
}
@Override
public void write(DataOutput d) throws IOException {
d.writeInt(id);
d.writeInt(values.size());
TIntDoubleIterator iterator = values.iterator();
while (iterator.hasNext()) {
iterator.advance();
d.writeInt(iterator.key());
d.writeDouble(iterator.value());
}
}
@Override
public void readFields(DataInput di) throws IOException {
id = di.readInt();
int count = di.readInt();
for (int i = 0; i < count; i++) {
values.put(di.readInt(), di.readDouble());
}
}
}
If the processing is really that complex, you should be able to realize a benefit from using Hadoop.
The common issue with small files, is that Hadoop will run a single java process per file and that will create overhead from having to start many processes and slows down the output. In your case this does not sound like it applies. More likely you have the opposite problem that only one Mapper is trying to process your input and it doesn't matter how big your cluster is at that point. Using the input split sounds like the right approach, but because your use case is specialized and deviates significantly from the norm, you may need to tweak a number of components to get the best performance.
So you should be able to get the benefits you are seeking from Hadoop Map Reduce, but it will probably take significant tuning and custom Input handling.
That said seldom(never?) will MapReduce be faster than a purpose built solution. It is a generic tool that is useful in that it can be used to distribute and solve many diverse problems without the need to write a purpose built solution for each.