pythonflyte

Pass blobs between tasks in Flyte


I'm trying to create a Flyte workflow that needs to pass data between several tasks. I looked at one of the examples in the documentation, but trying to recreate the blob-passing as minimally as possible I still can't get it to work.

Here's my workflow definition, in full (my real use case produces a lot more data, of course):

from flytekit.sdk.tasks import python_task, outputs, inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input


@inputs(the_text=Types.String)
@outputs(the_blob=Types.Blob)
@python_task
def create_blob(wf_params, the_text, the_blob):
    fname = "a-file.txt"
    with open(fname, "w") as f:
        f.write(the_text)

    the_blob.set(fname)


@inputs(the_blob=Types.Blob)
@outputs(the_text=Types.String)
@python_task
def read_blob(wf_params, the_blob, the_text):
    the_blob.download()
    with open(the_blob.local_path) as f:
        the_text.set(f.read())


@workflow_class
class PassBlob:
    input_text = Input(Types.String, required=True, help="The text to write to the file")

    create = create_blob(the_text=input_text)
    read = read_blob(the_blob=create.outputs.the_blob)

    output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")

This workflow deploys successfully, and when I run it the following happens:

What is the correct way to pass Types.Blobs between tasks? How do I make this work?


Solution

  • I am assuming you are running this on a local sandbox environment (you are using minio, which is test blob store that we deploy in the sandbox environment). Can you please share your flytekit.config file that you used to register the workflow.

    So Flyte automatically stores intermediate data in a bucket (S3 / GCS) based on how you configure it.

    The prefix setting is used to automatically upload the data to the configured bucket and prefix https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L7

    Versions prior to v0.7.0 - the shard formatter setting in the config is used- https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L14-L17

    Please also tell us what version of Flyte you are running. Please join the slack channel and I can help you get started. Sorry for all the troubles