I'm newbie in gcloud and BigQuery and want to read data from BigQuery using spark. I used Google APIs Client Library for Java. and able to connect with BigQuery. I get the com.google.api.services.bigquery.Bigquery object and able to print read datasets,tableId,and tableData
My question is
How can I connect this BigQuery authenticate object(credential object) to spark or is there anyway to use this object with hadoopApi
if there is no possibility than how can pass credential object to newHadoopAPi
GoogleAuthorizationCodeFlow flow = getFlow();
GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
.setRedirectUri(REDIRECT_URI).execute();
Credential credential=flow.createAndStoreCredential(response, null);
return credential;
My Hadoop api code is where I want to use my credential object
val tableData = sc.newAPIHadoopRDD(
conf,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject]).
Thanx @michael with the help of your link I found the solution
Just disable service account on hadoop configuration
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
and following code will be work
val hadoopConfiguration = sc.hadoopConfiguration
//BigQueryConfiguration.
hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
hadoopConfiguration.set("fs.gs.project.id", projectId);
hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
hadoopConfiguration.set("fs.gs.auth.client.id",
clientId)
hadoopConfiguration.set("fs.gs.auth.client.secret",
clientSecret)
hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
// Configure input and output for BigQuery access.
com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
val tableData = sc.newAPIHadoopRDD(
hadoopConfiguration,
classOf[GsonBigQueryInputFormat],
classOf[LongWritable],
classOf[JsonObject])
Where token path contain refresh token
{
"credentials": {
"user": {
"access_token": "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
"expiration_time_millis": 1460473581255,
"refresh_token": "XXXXXXXXXxxxxxxxxx"
}
}
}