javagoogle-cloud-platformgoogle-cloud-pubsubdataflow

Unable to get application default credentials. run on locally


I'm trying this example for retrieve data from GCP Pub/Sub at DataFlow.

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

import avro.shaded.com.google.common.collect.Lists;
import com.google.auth.oauth2.GoogleCredentials;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

public class StreamDemoConsumer {

    public static interface MyOptions extends DataflowPipelineOptions {
        @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
        @Default.String("coexon-seoul-dev:ledger_data_set.ledger_data2")
        String getOutput();

        void setOutput(String s);

        @Description("Input topic")
        @Default.String("projects/coexon-seoul-dev/topics/trading")
        String getInput();
        
        void setInput(String s);
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws IOException {

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

        options.setStreaming(true);
        Pipeline p = Pipeline.create(options);


        String topic = options.getInput();
        String output = options.getOutput();

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
        fields.add(new TableFieldSchema().setName("num_words").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);

        p //
                .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
                .apply("window",
                        Window.into(SlidingWindows//
                                .of(Duration.standardMinutes(2))//
                                .every(Duration.standardSeconds(30)))) //
                .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        String line = c.element();
                        c.output(line.split(" ").length);
                    }
                }))//
                .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) //
                .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        TableRow row = new TableRow();
                        row.set("timestamp", Instant.now().toString());
                        row.set("num_words", c.element());
                        c.output(row);
                    }
                })) //
                .apply(BigQueryIO.writeTableRows().to(output)//
                        .withSchema(schema)//
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

        p.run();
    }
}

I run this code using below command.

sh run_oncloud4.sh coexon-seoul-dev ledgerbucket

then the code running well

run_oncloud4.sh like below

#!/bin/bash

if [ "$#" -ne 2 ]; then
   echo "Usage:   ./run_oncloud.sh project-name  bucket-name"
   echo "Example: ./run_oncloud.sh cloud-training-demos  cloud-training-demos"
   exit
fi

PROJECT=$1
BUCKET=$2
MAIN=com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer

echo "project=$PROJECT  bucket=$BUCKET  main=$MAIN"

export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java \
 -Dexec.mainClass=$MAIN \
      -Dexec.args="--project=$PROJECT \
      --stagingLocation=gs://$BUCKET/staging/ \
      --tempLocation=gs://$BUCKET/staging/ \
      --output=$PROJECT:demos.streamdemo \
      --input=projects/$PROJECT/topics/streamdemo \
      --runner=DataflowRunner"

but I run the uppercode like below

sh run_locally.sh com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer

then Unable to get application default credentials error message occured.

>SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>SLF4J: Defaulting to no-operation (NOP) logger implementation
>SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
>Exception in thread "main" java.lang.RuntimeException: Unable to get application default credentials. Please see https://developers.google.com/accounts/docs/application-default-credentials for details on how to specify credentials. This version of the SDK is dependent on the gcloud core component version 2015.02.05 or newer to be able to get credentials from the currently authorized user via gcloud auth.
>   at org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer.throwNullCredentialException(NullCredentialInitializer.java:60)
>   at org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer$NullCredentialHttpUnsuccessfulResponseHandler.handleResponse(NullCredentialInitializer.java:53)
>   at com.google.cloud.hadoop.util.ChainingHttpRequestInitializer$3.handleResponse(ChainingHttpRequestInitializer.java:111)
>   at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1015)
>   at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
>   at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
>   at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
>   at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.executeWithRetries(BigQueryServicesImpl.java:854)
>   at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.getDataset(BigQueryServicesImpl.java:554)
>   at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.verifyDatasetPresence(BigQueryHelpers.java:196)
>   at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.validate(BigQueryIO.java:1486)
>   at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:640)
>   at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:656)
>   at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
>   at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
>   at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
>   at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
>   at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:575)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>   at com.google.cloud.training.dataanalyst.javahelp.StreamDemoConsumer.main(StreamDemoConsumer.java:115)
>
>Process finished with exit code 1

run_locally.sh

#!/bin/bash

if [ "$#" -ne 1 ]; then
   echo "Usage:   ./run_locally.sh mainclass-basename"
   echo "Example: ./run_oncloud.sh Grep"
   exit
fi

MAIN=com.google.cloud.training.dataanalyst.javahelp.$1

export PATH=/usr/lib/jvm/java-8-openjdk-amd64/bin/:$PATH
mvn compile -e exec:java -Dexec.mainClass=$MAIN

I've set the credential

echo ${GOOGLE_APPLICATION_CREDENTIALS}

/Users/mattheu/coexon-seoul-dev-898d91a66539.json

but the authorization error occur.

how can I solve this problem?


Solution

  • I experienced something similar, and the following steps worked for me:

    1. Install the Google Cloud SDK on your laptop. Instructions here: https://cloud.google.com/sdk/install

    2. Close your command line and reopen it.

    3. Run gcloud init and follow the instructions, which includes tying the SDK to your GCP account and project.

    4. Follow the instructions for manually setting up a service account (https://cloud.google.com/docs/authentication/production#obtaining_and_providing_service_account_credentials_manually). You only need to follow the instructions under "Obtaining and providing service account credentials manually." Basically you will save a file to your computer with your service account information, which will be used for the work you are trying to do.

    5. In your shell configuration file (on macOS starting with Catalina this is ~/.zshenv), add the line export GOOGLE_APPLICATION_CREDENTIALS="/path/to/the/file/you/saved/in/step/4"

    6. Close and reopen your shell and you should be good to go.

    I'm a little unsure if setting up the SDK (steps 1-3) was necessary, but it's good to have that setup anyway.