I am trying to deploy a Federated Learning server using Flower (flwr) on a Kubernetes cluster with a custom Docker image (fl-server:latest). The server needs to run on a specific port and host. My goal is to ensure that the server finds an available port before starting to avoid conflicts with other services.
I have implemented a Python class to create both the deployment and the corresponding service using the Kubernetes API. Below is the code for creating the deployment and service:
from kubernetes import client, config
from uuid import uuid4
class DeploymentFL:
def __init__(self):
pass
def create_deployment_object(self, model_name: str, port_k8s: str, port_flower: str, model_uuid: str, host: str = "192.168.49.2") -> client.V1Deployment:
"""
Create a Kubernetes deployment object for a Federated Learning server.
Args:
model_name (str): The name of the model to be used.
port_k8s (str): The port number for the Kubernetes service.
port_flower (str): The port number for the Flower server.
model_uuid (str): The uuid of the model to identify it.
host (str): The host address of the server.
Return:
client.V1Deployment: The deployment object for the Kubernetes API.
"""
try:
sanitized_model_name = f"{model_name.lower().replace('_', '-')}"
name_container = f"fl-server-{sanitized_model_name}-{model_uuid}"
# Define the container with environment variables and health checks
container = client.V1Container(
name=name_container,
image="fl-server:latest",
image_pull_policy="Never",
ports=[
client.V1ContainerPort(container_port=int(port_k8s)),
],
env=[
client.V1EnvVar(name="FL_HOST", value=host),
client.V1EnvVar(name="FL_PORT", value=str(port_k8s)),
],
volume_mounts=[client.V1VolumeMount(
name='model-storage',
mount_path='/mnt/data'
)],
liveness_probe=client.V1Probe(
_exec=client.V1ExecAction(command=["/bin/sh", "-c", f"nc -zv {host} {port_k8s}"]),
initial_delay_seconds=60,
period_seconds=10,
timeout_seconds=5,
failure_threshold=3,
),
readiness_probe=client.V1Probe(
_exec=client.V1ExecAction(command=["/bin/sh", "-c", f"nc -zv {host} {port_k8s}"]),
initial_delay_seconds=30,
period_seconds=10,
timeout_seconds=5,
failure_threshold=3,
)
)
# Define the Pod template
template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": name_container}),
spec=client.V1PodSpec(containers=[container], volumes=[client.V1Volume(
name='model-storage',
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name='model-pvc')
)])
)
# Define the Deployment specification
spec = client.V1DeploymentSpec(
replicas=1,
template=template,
selector={'matchLabels': {'app': name_container}}
)
name_deployment = f"{name_container}-deployment"
deployment = client.V1Deployment(
metadata=client.V1ObjectMeta(name=name_deployment),
spec=spec
)
return deployment
except Exception as e:
print(f"Error at create deployment object: {e}")
def create_service_object(self, model_name: str, port: int, model_uuid: str) -> client.V1Service:
"""
Create a Kubernetes service object for a Federated Learning server.
Args:
model_name (str): The name of the model to be used.
port (int): The port number for the server.
model_uuid (str): The uuid of the model to identify it.
Return:
client.V1Service: The service object for the Kubernetes API.
"""
try:
sanitized_model_name = model_name.lower().replace("_", "-")
service_name = f"fl-server-{sanitized_model_name}-{model_uuid}-service"
service = client.V1Service(
metadata=client.V1ObjectMeta(name=service_name),
spec=client.V1ServiceSpec(
selector={"app": f"fl-server-{sanitized_model_name}-{model_uuid}"},
ports=[client.V1ServicePort(port=port, target_port=port)],
type="NodePort"
)
)
return service, service_name
except Exception as e:
print(f"Error at create service object: {e}")
below is the python script that generates the fl-server image:
import flwr as fl
import tensorflow as tf
import os
import socket
def is_port_in_use(host, port):
"""Check if a port is in use."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((host, port)) == 0
def run_server(host: str, port: int):
"""
Start a Federated Learning server with the given model.
Args:
host (str): The host address for the server.
port (int): The port number for the server.
Return:
None
"""
try:
# Define the Federated Learning strategy
strategy = fl.server.strategy.FedAvg(
fraction_fit=0.1,
fraction_evaluate=0.1,
min_fit_clients=2,
min_evaluate_clients=2,
min_available_clients=2,
)
# Start the Flower server
fl.server.start_server(
server_address=f"{host}:{port}",
config=fl.server.ServerConfig(num_rounds=3),
strategy=strategy,
)
except Exception as e:
print(f"Error starting fl server: {e}")
if __name__ == "__main__":
# Read environment variables
host = os.getenv("FL_HOST", "192.168.49.2")
port = int(os.getenv("FL_PORT", "30878"))
# Find an available port
original_port = port
while is_port_in_use(host, port):
print(f"Port {port} is already in use. Trying another port...")
port += 1
print(f"Starting Flower server on {host}:{port} (initially tried {original_port})")
run_server(host, port)
I expected the server to start on the first available port. If the specified port is in use, the script should find the next available port and use it to start the server.
Even though the script is designed to check for port availability, I keep getting the following error in the logs when deploying in Kubernetes:
INFO : Starting Flower server, config: num_rounds=3, no round_timeout
Starting Flower server on 192.168.49.2:30099 (initially tried 30099)
Port in server address 192.168.49.2:30099 is already in use.
To try to solve the problem, I verified that the port checking logic works correctly by running the script locally. I ensured that the Kubernetes deployment uses the latest version of the Docker image (fl-server:latest). I checked that the service selector matches the labels of the pod. I verified that no other services are using the port using ss and netstat within the container.
Here is my Dockerfile used to build the image:
FROM python:3.10
# Install dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt
# Copy the application code
COPY fl_server.py /app/fl_server.py
WORKDIR /app
# Run the server
CMD ["python", "fl_server.py"]
In a Kubernetes context, you don't need to try to figure out an available port. Inside the cluster, each Pod and each Service has its own internal IP address, which means it's safe to have Deployments and Services using fixed ports.
It looks like flwr.server.start_server()
defaults to listening on port 8080 on all interfaces. This is internally a gRPC service, so there isn't necessarily a "best" port number to use for it; I'd stick with the default port number both in your own code and in the container. In a container it's important that the "listen" address is 0.0.0.0; if you try to pick your own address, it must be the address that's the container's external interface, and this will vary between container runtimes and individual containers.
if __name__ == "__main__":
host = os.getenv("FL_HOST", "0.0.0.0") # IPv4 "all interfaces"
port = int(os.getenv("FL_PORT", "8080")) # default flwr.server port
# do NOT try to probe for random other ports
print(f"Starting Flower server on {host}:{port}")
run_server(host, port)
Now in your Deployment, you know what port is going to be used; you don't have to guess. Similary, in your Service, you know what the target port is, and you can pick the "best" port for the Service itself. (In other contexts I might choose the framework's standard port number for the Deployment – 5000 for an Express application, 3000 for Flask, 4000 for Phoenix, 8080 for Java – and the standard HTTP port 80 for the Service.) You do not need to attempt to dynamically override the host and port.
def create_deployment_object(self, model_name: str, model_uuid: str) -> client.V1Deployment:
...
ports=[
client.V1ContainerPort(container_port=8080),
],
env=[],
...
def create_service_object(self, model_name: str, model_uuid: str) -> client.V1Service:
...
ports=[client.V1ServicePort(port=8080, target_port=8080)],
...
The one other part of this is that you've declared this as a NodePort
Service. From outside the cluster, the service will be reachable on any of the Kubernetes Nodes' IP addresses or DNS names, with a specific port number. In normal use Kubernetes itself assigns this port number, in the range 30000-32767. This port number will be different from any other NodePort number in use, which satisfies your initial requirement of needing a distinct non-conflicting port. kubectl get service
can show you what the cluster-assigned port number actually is.
The key issue in your code as you've shown it is that you're trying to listen on exactly 192.168.49.2:30878, or another port, but on that exact IP address. For a listener, the IP address must be an address that the current container has (or the current machine, in a non-container context). You can't tell your code to listen on a Node address, or a load-balancer address; it has to be an address the container itself has. I suspect you're getting an incorrect error message from the service when it can't set up the listener, but it's because of the fixed IP address and not the port number.
In a container context, usually the right answer is to listen on the special "all interfaces" address 0.0.0.0 (or IPv6 [::]
). It's possible to introspect the network environment to find out what interfaces exist, but this is unusual and complicated. The container process still won't be reachable except via the container system; a Kubernetes Service, or a docker run -p
published port; so it's not especially a security risk to be listening "everywhere".