I'm trying to create a Flyte workflow that needs to pass data between several tasks. I looked at one of the examples in the documentation, but trying to recreate the blob-passing as minimally as possible I still can't get it to work.
Here's my workflow definition, in full (my real use case produces a lot more data, of course):
from flytekit.sdk.tasks import python_task, outputs, inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input
@inputs(the_text=Types.String)
@outputs(the_blob=Types.Blob)
@python_task
def create_blob(wf_params, the_text, the_blob):
fname = "a-file.txt"
with open(fname, "w") as f:
f.write(the_text)
the_blob.set(fname)
@inputs(the_blob=Types.Blob)
@outputs(the_text=Types.String)
@python_task
def read_blob(wf_params, the_blob, the_text):
the_blob.download()
with open(the_blob.local_path) as f:
the_text.set(f.read())
@workflow_class
class PassBlob:
input_text = Input(Types.String, required=True, help="The text to write to the file")
create = create_blob(the_text=input_text)
read = read_blob(the_blob=create.outputs.the_blob)
output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")
This workflow deploys successfully, and when I run it the following happens:
the create
task runs successfully, and says this about its output:
the_blob:
type:
single
uri:
/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6
the read
task fails on the line the_blob.download()
, logging the follwing:
Welcome to Flyte! Version: 0.13.3
INFO:root:Entering timed context: Copying (/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6 -> /tmp/task_dir__m4xve78/512f322cd94d40898b9478db9296b12c)
INFO:root:Exiting timed context: Copying (/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6 -> /tmp/task_dir__m4xve78/512f322cd94d40898b9478db9296b12c) [Wall Time: 6.86999992467463e-05s, Process Time: 6.756699999999061e-05s]
ERROR:root:!!! Begin Error Captured by Flyte !!!
ERROR:root:Traceback (most recent call last):
File "/code/venv/lib/python3.6/site-packages/flytekit/common/exceptions/scopes.py", line 155, in system_entry_point
return wrapped(*args, **kwargs)
File "/code/venv/lib/python3.6/site-packages/flytekit/common/types/impl/blobs.py", line 202, in download
_data_proxy.Data.get_data(self.remote_location, self.local_path, is_multipart=False)
File "/code/venv/lib/python3.6/site-packages/flytekit/interfaces/data/data_proxy.py", line 136, in get_data
error_string=_six.text_type(ex),
Message:
Failed to get data from /4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6 to /tmp/task_dir__m4xve78/512f322cd94d40898b9478db9296b12c (recursive=False).
Original exception: [Errno 2] No such file or directory: '/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6'
User error.
ERROR:root:!!! End Error Captured by Flyte !!!
INFO:root:Entering timed context: Writing (/tmp/engine_dir_5x9bt65w -> s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0)
INFO:root:Output of command '['aws', '--endpoint-url', 'http://minio.flyte:9000', 's3', 'cp', '--recursive', '--acl', 'bucket-owner-full-control', '/tmp/engine_dir_5x9bt65w', 's3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0']':
b'Completed 907 Bytes/907 Bytes (84.2 KiB/s) with 1 file(s) remaining\rupload: ../tmp/engine_dir_5x9bt65w/error.pb to s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0/error.pb\n'
INFO:root:Exiting timed context: Writing (/tmp/engine_dir_5x9bt65w -> s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0) [Wall Time: 1.5020931999897584s, Process Time: 0.002140534999999999s]
What is the correct way to pass Types.Blob
s between tasks? How do I make this work?
I am assuming you are running this on a local sandbox environment (you are using minio, which is test blob store that we deploy in the sandbox environment). Can you please share your flytekit.config file that you used to register the workflow.
So Flyte automatically stores intermediate data in a bucket (S3 / GCS) based on how you configure it.
The prefix setting is used to automatically upload the data to the configured bucket and prefix https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L7
Versions prior to v0.7.0 - the shard formatter setting in the config is used- https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L14-L17
Please also tell us what version of Flyte you are running. Please join the slack channel and I can help you get started. Sorry for all the troubles