python-3.xgoogle-cloud-platformgoogle-cloud-storageapache-beamdataflow

WARNING:apache_beam.options.pipeline_options:Discarding unparseable args


I currently have the code:

gs_folder = sys.argv[1]
options = PipelineOptions(
    runner='DataflowRunner',
    project='xxx',
    job_name=f'xxx{uuid.uuid4()}',
    region='us-central1',
    temp_location='xxx')

gfs = gcs.GCSFileSystem(options)
p = beam.Pipeline(options=options)

discover_empty = p | 'Filenames' >> beam.Create(gfs.match([gs_folder])[0].metadata_list) | \
            'Reshuffle' >> beam.Reshuffle() | \
            'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))
p.run()

This code is based off of the question here. What eventually happens is that I get this error below:

WARNING:apache_beam.options.pipeline_options:Discarding unparseable args: ['gs://xx/xx']

Which does not make much sense since that is the folder that I this deletion to perform on. Furthermore it looks like the dataflow job does run through successfully, but the files that are supposed to be deleted are not correctly deleted. how am I supposed to pass the pipeline options arg here?

I also had a couple of follow up questions about this process. It looks like the beam.Create() runs locally, and then switches over to dataflow. How can I go about making that part of the pipeline run on dataflow?


Solution

  • Make sure that you are passing the flag as --input=gs//... That error looks like your command line invocation isn't valid, and the gs path is being interpreted as the whole argument.

    beam.Create runs as part of pipeline, but the argument passed to it is evaluate locally. To compute that in the pipeline instead, use beam.Create(None) and then have a DoFn that runs the matching logic.