least_connection.proto code
Node overloaded -- starting load balancing process
Traceback (most recent call last):
File "D:\lab7p2\least connection\node2.py", line 73, in <module>
node.check_load()
File "D:\lab7p2\least connection\node2.py", line 46, in check_load
self.balance_load()
File "D:\lab7p2\least connection\node2.py", line 36, in balance_load
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Message must be initialized with a dict: least_connection.LoadTransferMessage
node2.py code
import threading
import time
import grpc
import least_connection_pb2 as least_connection_pb2
import least_connection_pb2_grpc as least_connection_pb2_grpc
from concurrent import futures
class Node(least_connection_pb2_grpc.LeastConnectionLoadBalancerServicer):
def __init__(self):
self.node_id = '2'
self.server_address = 'localhost:50052'
self.connections = 85 # Initial number of connections
self.threshold = 80 # Threshold for load balancing
self.servers = [('localhost:50051', 70), ('localhost:50053', 65)] # List of tuples (server address, connections)
def get_least_connections_server(self):
least_connections_server = min(self.servers, key=lambda server: server[1], default=None)
return least_connections_server
def balance_load(self):
least_connections_server = self.get_least_connections_server()
if least_connections_server is None:
print("No server available for load transfer")
return
least_connections_address, least_connections_count = least_connections_server
transfer_load = self.connections - self.threshold
if transfer_load <= 0:
print("No load to transfer")
return
channel = grpc.insecure_channel(self.server_address)
stub = least_connection_pb2_grpc.LeastConnectionLoadBalancerStub(channel)
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
response = stub.TransferLoad(request)
if response.success:
print(f"Node {self.node_id} --> {transfer_load} Units --> Server {least_connections_address}")
self.connections -= transfer_load
def check_load(self):
print(f"Current node load: {self.connections}")
if self.connections > self.threshold:
print("Node overloaded -- starting load balancing process")
self.balance_load()
def getServerWithLeastConnections(self, request, context):
least_connections_server = self.get_least_connections_server()
server_obj = least_connection_pb2.Server(name=least_connections_server[0], current_connections=least_connections_server[1])
return least_connection_pb2.LeastConnectionServer(server=server_obj, connections=least_connections_server[1])
def serve(node):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
least_connection_pb2_grpc.add_LeastConnectionLoadBalancerServicer_to_server(node, server)
server.add_insecure_port('localhost:50052')
server.start()
server.wait_for_termination()
if __name__ == "__main__":
node = Node()
server_thread = threading.Thread(target=serve, args=(node,))
server_thread.start()
# Wait for a moment to ensure the server is up and running
time.sleep(1)
try:
while True:
user_input = input("Enter 1 to check status: ")
if user_input == "1":
node.check_load()
print("--------------------------------------------")
except KeyboardInterrupt:
pass
similarly node1,node3 code
proto code
syntax = "proto3";
package least_connection;
message Server {
string name = 1;
int32 current_connections = 2;
}
message RequestData {
string request_id = 1;
}
message LeastConnectionServer {
Server server = 1;
int32 connections = 2;
}
message LoadTransferMessage {
Server server = 1;
int32 load = 2;
}
service LeastConnectionLoadBalancer {
rpc getServerWithLeastConnections(RequestData) returns (LeastConnectionServer);
rpc TransferLoad(LoadTransferMessage) returns (LoadTransferResponse);
}
message LoadTransferResponse {
bool success = 1;
}
Your Node
class has an instance attribute server
defined as:
self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]
Which has a type List
of tuple (str
,int
)
So, in balance_load
, after the line:
least_connections_server = self.get_least_connections_server()
least_connections_server
contains a tuple of string,int
Which you then unpack:
least_connections_address, least_connections_count = least_connections_server
least_connections_address
(str
) and least_connections_count
(int
).
From your proto, LoadTransferMessage
takes a server
of type Server
and a load
of int
request = least_connection_pb2.LoadTransferMessage(
server=least_connections_address,
load=int(transfer_load),
)
But least_connections_address
is a str
not a Server
You want (something like):
server = least_connection_pb2.Server(
name=least_connections_address,
current_connections=least_connections_count,
)
request = least_connection_pb2.LoadTransferMessage(
server=server,
load=int(transfer_load),
)