When I run Rclone to list the difference between a local folder and a S3 folder in shell directly, it succeed.
➜ rclone copy --dry-run --include="*.tdms" --use-json-log /Users/hongbo-miao/Documents/motor hm-s3:my-bucket/motor
{"level":"warning","msg":"Skipped copy as --dry-run is set (size 30.518Mi)","object":"motor-1.tdms","objectType":"*local.Object","size":32000448,"skipped":"copy","source":"operations/operations.go:2360","time":"2023-06-05T17:34:51.1769-07:00"}
{"level":"warning","msg":"\nTransferred: \t 30.518 MiB / 30.518 MiB, 100%, 0 B/s, ETA -\nTransferred: 1 / 1, 100%\nElapsed time: 0.7s\n\n","source":"accounting/stats.go:498","stats":{"bytes":32000448,"checks":0,"deletedDirs":0,"deletes":0,"elapsedTime":0.795462417,"errors":0,"eta":null,"fatalError":false,"renames":0,"retryError":false,"speed":0,"totalBytes":32000448,"totalChecks":0,"totalTransfers":1,"transferTime":0,"transfers":1},"time":"2023-06-05T17:34:51.178495-07:00"}
However, if I use Prefect ShellOperation
from prefect-shell v0.1.5 to run same command:
@task
async def get_missing_files() -> None:
log = await ShellOperation(
commands=['rclone copy --dry-run --include="*.tdms" --use-json-log /Users/hongbo-miao/Documents/motor hm-s3:my-bucket/motor'],
stream_output=False,
).run()
@flow
async def ingest_data() -> None:
missing_files_list = await get_missing_files(
source_dirname, s3_raw_path, delta_table_path, location
)
I got error
➜ python src/main.py
17:38:33.356 | INFO | prefect.engine - Created flow run 'chirpy-bull' for flow 'ingest-data'
17:38:33.869 | INFO | Flow run 'chirpy-bull' - Created task run 'get_missing_files-0' for task 'get_missing_files'
17:38:33.870 | INFO | Flow run 'chirpy-bull' - Executing 'get_missing_files-0' immediately...
17:38:34.102 | INFO | Task run 'get_missing_files-0' - PID 40879 triggered with 1 commands running inside the '.' directory.
17:38:34.532 | ERROR | Task run 'get_missing_files-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1550, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "ingest-data/src/tasks/get_missing_files.py", line 27, in get_missing_files
log = await ShellOperation(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/utilities/processutils.py", line 221, in open_process
yield process
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 396, in run
await shell_process.wait_for_completion()
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 177, in wait_for_completion
raise RuntimeError(
RuntimeError: PID 40879 failed with return code 6.
17:38:34.621 | ERROR | Task run 'get_missing_files-0' - Finished in state Failed('Task run encountered an exception: RuntimeError: PID 40879 failed with return code 6.\n')
17:38:34.621 | ERROR | Flow run 'chirpy-bull' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "ingest-data/src/main.py", line 18, in ingest_data
missing_files_list = await get_missing_files(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1550, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "ingest-data/src/tasks/get_missing_files.py", line 27, in get_missing_files
log = await ShellOperation(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/utilities/processutils.py", line 221, in open_process
yield process
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 396, in run
await shell_process.wait_for_completion()
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 177, in wait_for_completion
raise RuntimeError(
RuntimeError: PID 40879 failed with return code 6.
17:38:34.732 | ERROR | Flow run 'chirpy-bull' - Finished in state Failed('Flow run encountered an exception. RuntimeError: PID 40879 failed with return code 6.\n')
Traceback (most recent call last):
File "ingest-data/src/main.py", line 41, in <module>
asyncio.run(ingest_data())
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
return await state.result(fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 674, in orchestrate_flow_run
result = await flow_call.aresult()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "ingest-data/src/main.py", line 18, in ingest_data
missing_files_list = await get_missing_files(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 109, in wait_for_call_in_loop_thread
return call.result()
^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1132, in get_task_call_return_value
return await future._result()
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/futures.py", line 240, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/engine.py", line 1550, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
^^^^^^^^^^
File "ingest-data/src/tasks/get_missing_files.py", line 27, in get_missing_files
log = await ShellOperation(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/hongbo-miao/.pyenv/versions/3.11.1/lib/python3.11/contextlib.py", line 222, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect/utilities/processutils.py", line 221, in open_process
yield process
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 396, in run
await shell_process.wait_for_completion()
File "/Users/hongbo-miao/Library/Caches/pypoetry/virtualenvs/ingest-data-RUzdR1F0-py3.11/lib/python3.11/site-packages/prefect_shell/commands.py", line 177, in wait_for_completion
raise RuntimeError(
RuntimeError: PID 40879 failed with return code 6.
make: *** [poetry-run-dev] Error 1
What does this error mean? Thanks!
In my case, the Prefect agent tries to access the folder /Users/hongbo-miao/Documents/motor
through Rclone. Originally, I thought that if Rclone can access the folder, then the same Rclone command in the Prefect task can run successfully.
However, it seems that the Prefect agent also needs to have access to the local folder /Users/hongbo-miao/Documents/motor
. In my case, it does not seem to have the necessary access.
So I need make sure Prefect agent also has access of the folder.
As I know Prefect agent has access of /tmp
. If I simply move the folder to /tmp/motor
and change the corresponding path in the code to:
log = await ShellOperation(
commands=['rclone copy --dry-run --include="*.tdms" --use-json-log /tmp/motor hm-s3:my-bucket/motor'],
stream_output=False,
).run()
Then it works well.