I'm trying to implement the Spinner graph partitioning algorithm in giraph.
In the first steps, my program adds edges to a given input graph so that it becomes an undirected graph and every vertex chooses a random partition. (This partition-integer is stored in the VertexValue
) At the end of this initializing step, each vertex sends a message to all outgoing edges with the vertex ID (a LongWritable
) and the partition the vertex chose.
This all works fine. Now in the step I'm having trouble with, each vertex iterates over the received messages and saves the received partition in the EdgeValue
of the corresponding edge.
(VertexValue
is V
in Vertex<I,V,E>
, EdgeValue
is E
in Edge<I,E>
)
Here are the important parts of my code:
Wrapper Classes:
public class EdgeValue implements Writable {
private int weight;
private int partition;
// Getters and setters for weight and partition
public EdgeValue() {
this.weight = -2;
this.partition = -1;
}
// Constructors taking 1 and 2 ints and setting weight/partition to the given value
@Override
public void readFields(DataInput in) throws IOException {
this.weight = in.readInt();
this.partition = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.weight);
out.writeInt(this.partition);
}
}
public class SpinnerMessage implements Writable, Configurable {
private long senderId;
private int updatePartition;
public SpinnerMessage() {
this.senderId = -1;
this.updatePartition = -1;
}
// Constructors taking int and/or LongWritable and setting the fields
// Getters and setters for senderId and updatePartition
@Override
public void readFields(DataInput in) throws IOException {
this.senderId = in.readLong();
this.updatePartition = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.senderId);
out.writeInt(this.updatePartition);
}
}
The compute
method in the step before (ran is a Random
object):
public void compute(Vertex<LongWritable, VertexValue, EdgeValue> vertex, Iterable<LongWritable> messages) {
int initialPartition = this.ran.nextInt(GlobalInformation.numberOfPartitions);
vertex.getValue().setPartition(initialPartition);
sendMessageToAllEdges(vertex, new SpinnerMessage(vertex.getId(),initialPartition));
}
The compute
method in the step in which the bug happens:
public void compute(Vertex<LongWritable, VertexValue, EdgeValue> vertex,Iterable<SpinnerMessage> messages) throws IOException {
for (SpinnerMessage m : messages) {
vertex.getEdgeValue(new LongWritable(m.getSenderWritable().get())).setPartition(m.getUpdatePartition());
}
// ... some other code, e.g. initializing the amountOfNeighbors array.
// Here I get an ArrayIndexOutOfBoundsException since the partition is -1:
for (Edge<LongWritable, EdgeValue> edge : vertex.getEdges()) {
EdgeValue curValue = edge.getValue();
amountOfNeighbors[curValue.getPartition()] += curValue.getWeight();
}
However, when I iterate over the edges with e.g.
for(Edge<LongWritable, EdgeValue> e : vertex.getEdges())
or via
vertex.getEdgeValue(someVertex)
then the returned EdgeValue
has the weight -2
and the partition -1
(the default values from the standard constructor)
My thoughts what could cause the bug:
getEdgeValue(new LongWritable(someLong))
maybe doesn't work since it would be a different Object than another new LongWritable(someLong)
with the same value. However, I've seen this used in giraph code so this seems to be no problem, only the long stored inside the LongWritable
seems to matter.
(The most likely cause) Hadoop serialization and deserialization is somehow changing my EdgeValue
objects. Since Hadoop is for very big graphs, they might not fit into the RAM. For this, VertexValue
and EdgeValue
have to implement Writable
. After checking with some giraph code online, however, I implemented read()
and write()
in a way that seems correct to me (Writing and Reading the important fields in the same order). (This is what I believe to be in some kind connected to the problem since the returned EdgeValue
on the second call has the field values of the standard constructor)
I also read a bit in the documentation:
E getEdgeValue(I targetVertexId)
Return the value of the first edge with the given target vertex id, or null if there is no such edge. Note: edge value objects returned by this method may be invalidated by the next call. Thus, keeping a reference to an edge value almost always leads to undesired behavior.
However, this doesn't apply to me, since I only have one EdgeValue
variable, right?
Thank you in advance to everyone who takes the time to help me. (I'm using hadoop 1.2.1 and giraph 1.2.0)
After looking at some more giraph code examples I found the solution: The Vertex.getEdgeValue()
method basically creates a copy of the EdgeValue
of the vertex. If you change the object it returns, it won't write these changes
back to disk. To save information in the EdgeValue
or VertexValue
, you have to use setVertexValue()
or setEdgeValue()
.