pythonprotocol-buffersgrpcload-balancingdistributed-computing

Least Connection Load balancing using Grpc


least_connection.proto code
Node overloaded -- starting load balancing process
Traceback (most recent call last):
File "D:\lab7p2\least connection\node2.py", line 73, in <module>
node.check_load()
File "D:\lab7p2\least connection\node2.py", line 46, in check_load
self.balance_load()
File "D:\lab7p2\least connection\node2.py", line 36, in balance_load
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Message must be initialized with a dict: least_connection.LoadTransferMessage

node2.py code

import threading
import time
import grpc
import least_connection_pb2 as least_connection_pb2
import least_connection_pb2_grpc as least_connection_pb2_grpc
from concurrent import futures

class Node(least_connection_pb2_grpc.LeastConnectionLoadBalancerServicer):
    def __init__(self):
        self.node_id = '2'
        self.server_address = 'localhost:50052'
        self.connections = 85  # Initial number of connections
        self.threshold = 80    # Threshold for load balancing
        self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]  # List of tuples (server address, connections)

    def get_least_connections_server(self):
        least_connections_server = min(self.servers, key=lambda server: server[1], default=None)
        return least_connections_server

    def balance_load(self):
        least_connections_server = self.get_least_connections_server()
        if least_connections_server is None:
            print("No server available for load transfer")
            return

        least_connections_address, least_connections_count = least_connections_server

        transfer_load = self.connections - self.threshold
        if transfer_load <= 0:
            print("No load to transfer")
            return

        channel = grpc.insecure_channel(self.server_address)
        stub = least_connection_pb2_grpc.LeastConnectionLoadBalancerStub(channel)

        request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
        response = stub.TransferLoad(request)
        if response.success:
            print(f"Node {self.node_id} --> {transfer_load} Units --> Server {least_connections_address}")
            self.connections -= transfer_load

    def check_load(self):
        print(f"Current node load: {self.connections}")
        if self.connections > self.threshold:
            print("Node overloaded -- starting load balancing process")
            self.balance_load()

    def getServerWithLeastConnections(self, request, context):
        least_connections_server = self.get_least_connections_server()
        server_obj = least_connection_pb2.Server(name=least_connections_server[0], current_connections=least_connections_server[1])
        return least_connection_pb2.LeastConnectionServer(server=server_obj, connections=least_connections_server[1])

def serve(node):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    least_connection_pb2_grpc.add_LeastConnectionLoadBalancerServicer_to_server(node, server)
    server.add_insecure_port('localhost:50052')
    server.start()
    server.wait_for_termination()

if __name__ == "__main__":
    node = Node()
    server_thread = threading.Thread(target=serve, args=(node,))
    server_thread.start()

    # Wait for a moment to ensure the server is up and running
    time.sleep(1)

    try:
        while True:
            user_input = input("Enter 1 to check status: ")

            if user_input == "1":
                node.check_load()
            print("--------------------------------------------")
    except KeyboardInterrupt:
        pass

similarly node1,node3 code

proto code

syntax = "proto3";

package least_connection;

message Server {
  string name = 1;
  int32 current_connections = 2;
}

message RequestData {
  string request_id = 1;
}

message LeastConnectionServer {
  Server server = 1;
  int32 connections = 2;
}

message LoadTransferMessage {
  Server server = 1;
  int32 load = 2;
}

service LeastConnectionLoadBalancer {
  rpc getServerWithLeastConnections(RequestData) returns (LeastConnectionServer);
  rpc TransferLoad(LoadTransferMessage) returns (LoadTransferResponse);
}

message LoadTransferResponse {
  bool success = 1;
}


Solution

  • Your Node class has an instance attribute server defined as:

    self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]
    

    Which has a type List of tuple (str,int)

    So, in balance_load, after the line:

    least_connections_server = self.get_least_connections_server()
    

    least_connections_server contains a tuple of string,int

    Which you then unpack:

    least_connections_address, least_connections_count = least_connections_server
    

    least_connections_address (str) and least_connections_count (int).

    From your proto, LoadTransferMessage takes a server of type Server and a load of int

    request = least_connection_pb2.LoadTransferMessage(
      server=least_connections_address,
      load=int(transfer_load),
    )
    

    But least_connections_address is a str not a Server

    You want (something like):

    server = least_connection_pb2.Server(
      name=least_connections_address,
      current_connections=least_connections_count,
    )
    
    request = least_connection_pb2.LoadTransferMessage(
      server=server,
      load=int(transfer_load),
    )