I have a luigi workflow that downloads a bunch of large files via ftp and deposits them on s3.
I have one task that reads a list of files to download then creates a bunch of tasks that actually do the downloads
The idea is that the result of this workflow is a single file containing a list of downloads that have succeeded, with any failed downloads being reattempted on the next run the following day.
The problem is that if any of the download tasks fails then the successful download list is never created.
This is because the dynamically created tasks become requirements of the main task that creates them and compiles a list from their outputs.
Is there a way to make failures of these download task insignificant so that the list is compiled minus the output of the failed tasks?
Example code below, GetFiles is the task that we are calling from the command line.
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):
def run(self):
with self.input().open('r') as fileList:
files = json.load(fileList)
tasks = []
taskOutputs = []
for file in files:
task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
tasks.append(task)
taskOutputs.append(task.output())
yield tasks
successfulDownloads = MakeSuccessfulOutputList(taskOutputs)
with self.output().open('w') as out:
json.dump(successfulDownloads, out)
def output(self):
client = S3Client()
return S3Target(path='successfulDownloads.json', client=client)
I have read the documentation a few times, and I found no indication of such things as non-critical failures. That being said, this behavior could be easily achieved by overriding Task.complete
method in DownloadFileFromFtp
, while still being able to use DownloadFileFromFtp.output
in GetFiles.run
.
By overring with return True
, the Task DownloadFileFromFtp
will succeed regardless of the success of the download.
class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()
def run(self):
with self.output().open('w') as output:
WriteFileFromFtp(sourceUrl, output)
def output(self):
client = S3Client()
return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)
def complete(self,):
return True
Notice however, that you could also use more complex logic in that complete
method - like failing only if the task met a specific network failure at runtime.