kubernetesdaskdask-kubernetes

dask-kubernetes KubeCluster stuck


I'm trying to get up and running with dask on kubernetes. Below is effectively a hello world for dask-kubernetes, but I'm stuck on the error below.

main.py:

import os
from dask_kubernetes import KubeCluster
from dask.distributed import Client
import dask.array as da


if __name__ == '__main__':
    path_to_src = os.path.dirname(os.path.abspath(__file__))
    cluster = KubeCluster(os.path.join(path_to_src, 'pod-spec.yaml'), namespace='124381-dev')
    print('Cluster constructed')

    cluster.scale(10)
    # print('Cluster scaled')

    # Connect Dask to the cluster
    client = Client(cluster)
    print('Client constructed')

    # Create a large array and calculate the mean
    array = da.ones((100, 100, 100))
    print('Created big array')
    print(array.mean().compute())  # Should print 1.0
    print('Computed mean')

The output:

$ python src/main.py 
Creating scheduler pod on cluster. This may take some time.
Forwarding from 127.0.0.1:60611 -> 8786
Handling connection for 60611
Handling connection for 60611
Handling connection for 60611
Cluster constructed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f1f874b8130>>, <Task finished name='Task-54' coro=<SpecCluster._correct_state_internal() done, defined at /home/cliff/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/deploy/spec.py:327> exception=TypeError("unsupported operand type(s) for +=: 'NoneType' and 'list'")>)
Traceback (most recent call last):
  File "/home/cliff/anaconda3/envs/dask/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/cliff/anaconda3/envs/dask/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/cliff/anaconda3/envs/dask/lib/python3.8/site-packages/distributed/deploy/spec.py", line 358, in _correct_state_internal
    worker = cls(self.scheduler.address, **opts)
  File "/home/cliff/anaconda3/envs/dask/lib/python3.8/site-packages/dask_kubernetes/core.py", line 151, in __init__
    self.pod_template.spec.containers[0].args += worker_name_args
TypeError: unsupported operand type(s) for +=: 'NoneType' and 'list'
Handling connection for 60611
Handling connection for 60611
Client constructed
Created big array

Note that there is no terminal prompt at the end of the output - it's still running but never progresses. In another terminal kubectl get pods shows "cliff-testing" as running as well.

pod-spec.yaml:

apiVersion: v1
kind: Pod
metadata:
  name: cliff-testing
  labels:
    app: cliff-docker-test
spec:
  imagePullSecrets:
  - name: <redacted>
  securityContext:
    runAsUser: 1000
  restartPolicy: OnFailure
  containers:
  - name: cliff-test-container
    image: <redacted: works with docker pull>
    imagePullPolicy: Always
    resources:
      limits:
        cpu: 2
        memory: 4G
      requests:
        cpu: 1
        memory: 2G


Solution

  • In the pod template (pod-spec.yaml), the field metadata.name is set. Removing this allowed the code to run. It appears that dask-kubernetes creates a scheduler pod named "dask-<user>-<random>" and follows the same naming approach for workers. By fixing the name in the pod template, dask-kubernetes was attempting to create worker pods with the same name as the scheduler pod (and each other) which is illegal.

    If you want to name the pods differently, you can use the keyword argument name when constructing KubeCluster to name the pods (dask will automatically append a random string to this name for each pod).

    For example, the sample below will result in each pod (scheduler and workers) being named "my-dask-pods-<random>"

    from dask_kubernetes import KubeCluster
    cluster = KubeCluster('pod-spec.yaml', name='my-dask-pods-')