google-cloud-platformgoogle-cloud-storagegoogle-cloud-dataflowapache-beamapache-beam-io

Accessing Cloud storage bucket in a java SDK Apache Beam pipeline yields 401 Invalid credentials


I'm trying to read a csv from a cloud storage bucket and store it in a pcollection. To authenticate with the bucket, I'm using a service account with roles/storage.admin and a JSON key. This my pipelinesOptions object.

DataflowPipelineOptions dfOptions = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
        dfOptions.setProject("project_name");
        dfOptions.setStagingLocation("bucket_name");
        dfOptions.setGcpCredential(GoogleCredentials.fromStream(
        new FileInputStream(PATH_TO_JSON_KEY)));
        dfOptions.setTempLocation("gs://bucket_name/folder_name");
        dfOptions.setServiceAccount("serivce_acount_name");
        Pipeline myPipe= Pipeline.create(dfOptions);
        PCollection<ReadableFile> readFile= myPipe.apply( FileIO.match().filepattern("gs://bucket_name/file_name.csv")).apply(FileIO.readMatches());

However, running the above mentioned pipeline results in the error:

Caused by: java.io.IOException: Error trying to get gs://bucket_name/object_name.csv: {"code":401,"errors":[{"domain":"global","location":"Authorization","locationType":"header","message":"Invalid Credentials","reason":"authError"}],"message":"Invalid Credentials"}

If I use the dataflowrunner instead by adding to my Pipelineoptions

dfOptions.setRunner(DataflowRunner.class);

I get the same exact error for my staging bucket.

401 Unauthorized
GET https://storage.googleapis.com/storage/v1/b/bucket_name
{
  "code" : 401,
...same as above...
}

I'm using the same credentials to access the same bucket GCS Java client library and it works absolutely fine.

StorageOptions options = StorageOptions.newBuilder()
                        .setProjectId(PROJECT_ID)
                       .setCredentials(GoogleCredentials.fromStream(
                    new FileInputStream(PATH_TO_JSON_KEY))).build();
    
Storage storage = options.getService();
Blob blob = storage.get(BUCKET_NAME, OBJECT_NAME);
ReadChannel r = blob.reader();

I also downloaded the same file from the same bucket with same Service account and key using gsutil with no problems. The problem only occurs when using Apache beam.

Versions of various dependencies I'm using-


Solution

  • It is worth noting that Dataflow's support for Apache Beam 2.24.0 was deprecated on September 18, 2021. A first step I would say is to update to a recent version of the SDK. In particular, Beam has adopted the GCP Libraries BOM which coordinates the versions of GCP client libraries and auth libraries.