I am trying to implement the algorithm described in the image, using MPI. It is part of a University project where we are building a distributed satellite to ground station communication system. I have made multiple iterations of it, but I can never manage to break ties when multiple candidate winners are elected.
Here is the function that handles the election process. This function is called for every single Ground station process (there are 5 in their communication group, GS 0, 1, 2, 3 and 4) and is started via a START_LELECT_GS
event from a coordinator process (rank 10).
void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
MPI_Status status;
int is_leader = 0;
int received_count = 0;
int neighbor_received_from[num_of_neighbors];
printf("Process %d: Starting leader election\n", rank);
// Determine if the process is a leaf node
const int is_leaf = (num_of_neighbors == 1);
printf("Process %d: Is leaf? %d\n", rank, is_leaf);
// Send initial <ELECT> messages only if the process is a leaf node
if (is_leaf) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
}
// Receive and process messages from neighbors
while (1) {
int sender_rank;
MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
printf("Process %d: Received message from %d with tag %d\n", rank, status.MPI_SOURCE, status.MPI_TAG);
if (status.MPI_TAG == ELECT) {
received_count++;
neighbor_received_from[status.MPI_SOURCE] = 1;
printf("Process %d: Received ELECT from %d\n", rank, status.MPI_SOURCE);
// If all neighbors except one have sent ELECT messages, probe for incoming messages before sending ELECT to the remaining neighbor
if (received_count == num_of_neighbors - 1) {
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, &status);
if (flag && status.MPI_TAG == TERMINATE_LELECT_GS) {
printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);
if (status.MPI_SOURCE > rank) {
printf("Process %d: Surrendering leadership to process %d\n", rank, status.MPI_SOURCE);
MPI_Send(&rank, 1, MPI_INT, status.MPI_SOURCE, GS_LEADER, comm);
is_leader = 0;
} else {
printf("Process %d: Declaring leadership over process %d\n", rank, status.MPI_SOURCE);
is_leader = 1;
}
// send out terminate to all neighbors but the sender
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] != status.MPI_SOURCE) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
}
break;
}
if (!flag) {
// if we got elect from all but one, send elect to remaining neighbor
for (int i = 0; i < num_of_neighbors; i++) {
if (!neighbor_received_from[neighbor_gs[i]]) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], ELECT, comm);
printf("Process %d: Sent ELECT to remaining neighbor %d\n", rank, neighbor_gs[i]);
break;
}
}
}
}
// If all neighbors have sent ELECT messages, this process is a candidate for the leader
if (received_count == num_of_neighbors) {
printf("Process %d: Candidate for leader\n", rank);
is_leader = 1;
// send terminate to all neighbors
for (int i = 0; i < num_of_neighbors; i++) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
break;
}
} else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] != status.MPI_SOURCE) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
}
break;
} else if (status.MPI_TAG == GS_LEADER) {
printf("Process %d: Declared leader due to GS_LEADER message\n", rank);
is_leader = 1;
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] != status.MPI_SOURCE) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
}
break;
}
}
MPI_Barrier(comm);
printf("Process %d: Terminated\n", rank);
if (is_leader) {
printf("Process %d: Sending LELECT_GS_DONE to coordinator\n", rank);
MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
}
}
Here is an example of a problematic output:
parsing START_LELECT_GS event
Process 0: Starting leader election
Process 0: Is leaf? 0
Process 1: Starting leader election
Process 1: Is leaf? 0
Process 4: Starting leader election
Process 4: Is leaf? 1
Leaf Process 4: Sent ELECT to 0
Process 2: Starting leader election
Process 3: Starting leader election
Process 3: Is leaf? 1
Leaf Process 3: Sent ELECT to 1
Process 2: Is leaf? 1
Leaf Process 2: Sent ELECT to 1
Process 0: Received message from 4 with tag 22
Process 0: Received ELECT from 4
Process 0: Sent ELECT to remaining neighbor 1
Process 1: Received message from 3 with tag 22
Process 1: Received ELECT from 3
Process 1: Received message from 2 with tag 22
Process 1: Received ELECT from 2
Process 1: Sent ELECT to remaining neighbor 0
Process 1: Received message from 0 with tag 22
Process 1: Received ELECT from 0
Process 1: Candidate for leader
Process 1: Sent TERMINATE_LELECT_GS to 3
Process 1: Sent TERMINATE_LELECT_GS to 0
Process 1: Sent TERMINATE_LELECT_GS to 2
Process 0: Received message from 1 with tag 22
Process 0: Received ELECT from 1
Process 0: Candidate for leader
Process 0: Sent TERMINATE_LELECT_GS to 1
Process 0: Sent TERMINATE_LELECT_GS to 4
Process 4: Received message from 0 with tag 24
Process 4: Received TERMINATE_LELECT_GS from 0
Process 3: Received message from 1 with tag 24
Process 3: Received TERMINATE_LELECT_GS from 1
Process 2: Received message from 1 with tag 24
Process 2: Received TERMINATE_LELECT_GS from 1
Process 0: Terminated
Process 0: Sending LELECT_GS_DONE to coordinator
Process 3: Terminated
Process 4: Terminated
Process 1: Terminated
Process 1: Sending LELECT_GS_DONE to coordinator
process 10 waiting at barrier
Process 2: Terminated
Two candidates arise but despite me trying to probe for an intermediate termination message so that I can break the tie early, it does not work but I cannot figure out the reason why.
I did manage to land on an implementation of the said algorithm that seems to be working properly now. Here is the code if anyone is interested.
P.S. Please do let me know if you see any bugs!
void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
MPI_Status status;
int leader_rank = -1;
int received_count = 0;
int neighbor_received_from[num_of_neighbors];
for (int i = 0; i < num_of_neighbors; i++) neighbor_received_from[i] = 0;
int remaining_neighbor = -1;
printf("Process %d: Starting leader election\n", rank);
// Determine if the process is a leaf node
const int is_leaf = (num_of_neighbors == 1);
printf("Process %d: Is leaf? %d\n", rank, is_leaf);
// Send initial <ELECT> messages only if the process is a leaf node
if (is_leaf) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
}
do {
int sender_rank;
while (received_count < num_of_neighbors - 1) {
MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
if (status.MPI_TAG == ELECT) {
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] == status.MPI_SOURCE) {
neighbor_received_from[i] = 1;
}
}
received_count++;
printf("Process %d got ELECT from %d (replies = %d)\n", rank, status.MPI_SOURCE, received_count);
if (received_count == num_of_neighbors - 1) break;
}
if (status.MPI_TAG == TERMINATE_LELECT_GS) {
// leader has been found, terminate and notify all neighbors by sending leader rank
leader_rank = sender_rank;
printf("Process %d got TERMINATE from %d with leader %d\n", rank, status.MPI_SOURCE, sender_rank);
for (int i = 0; i < num_of_neighbors; i++) {
printf("Process %d propagating leader to process %d\n", rank, neighbor_gs[i]);
if(neighbor_gs[i] != status.MPI_SOURCE) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
}
break;
}
}
if (leader_rank != -1) break; // got terminate while still sending elects, terminate early
printf("iterating for remaining neighbor\n");
// we have received replies from all neighbors but one
for (int i = 0; i < num_of_neighbors; i++) {
printf("considering %d with found %d\n", neighbor_gs[i], neighbor_received_from[i]);
if (!neighbor_received_from[i]) {
printf("remaining neighbor is %d\n", neighbor_gs[i]);
remaining_neighbor = neighbor_gs[i];
break;
}
}
if (remaining_neighbor != -1 && !is_leaf) {
// if we are a leaf, don't resend
printf("Process %d has received all replies but one (from %d)...\n", rank, remaining_neighbor);
int flag;
// probe to see if the remaining neighbor is sending an ELECT before we send it one
MPI_Iprobe(remaining_neighbor, ELECT, comm, &flag, &status);
if (flag) {
MPI_Recv(&sender_rank, 1, MPI_INT, remaining_neighbor, ELECT, comm, &status);
// we got our last elect response, so we are the leader
printf("Process %d is leader because it received ELECT from remaining neighbor\n", rank);
// note: could update count and received neighbors but no need
leader_rank = rank;
// send out our rank to all neighbors
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d propagating winner (us) to %d\n", rank, neighbor_gs[j]);
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
} else {
// else we send to the last neighbor
printf("Process %d, no incoming last elect from probe, sending elect to last neighbor\n", rank);
MPI_Send(&rank, 1, MPI_INT, remaining_neighbor, ELECT, comm);
}
}
// have sent to all neighbors, wait for outcome
printf("Process %d has sent all elects, now waiting for final event\n", rank);
MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
if (status.MPI_TAG == ELECT && status.MPI_SOURCE == remaining_neighbor) {
// probe for terminate in case someone else was elected after the elect we received was sent
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &flag, &status);
if (flag) {
// another process was declared leader faster, abort
MPI_Recv(&leader_rank, 1, MPI_INT, MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &status);
printf("Process %d got terminate from %d because another process won first!\n", rank,
status.MPI_SOURCE);
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d sending terminate to %d\n", rank, neighbor_gs[j]);
if(neighbor_gs[j] != status.MPI_SOURCE) MPI_Send(&sender_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
}
// means we got our last elect but after we sent out an elect to the remaining neighbor
// incoming elect right after we sent elect the same way, so contest
if (rank > sender_rank) {
printf("Process %d got elect on same edge from %d and won\n", rank, sender_rank);
// we win the fight and are the leader
leader_rank = rank;
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d propagating ourself (winner) to %d\n", rank, neighbor_gs[j]);
if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
} else {
// sender has won and is leader
printf("Process %d got elect on same edge and lost fight. winner was %d\n", rank,
status.MPI_SOURCE);
leader_rank = status.MPI_SOURCE;
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d propagating other process/winner to %d\n", rank, neighbor_gs[j]);
if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
}
} else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
// we got terminate signal, notify neighbors
leader_rank = sender_rank;
printf("Process %d got terminate from %d with leader being %d!\n", rank, status.MPI_SOURCE, leader_rank);
for (int j = 0; j < num_of_neighbors; j++) {
if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
}
} while (1);
printf("Process %d exited loop with leader being %d\n", rank, leader_rank);
MPI_Barrier(comm);
if (rank == leader_rank) {
printf("Leader sending done to coordinator\n");
MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
}
}