I am running a spark scala job on GCP DataProc cluster. After processing data, I need to publish messages to PubSub topic but i'm getting an error as mentioned below.
No functional channel service provider found. Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact
Everything works fine till spark processing. As soon as i publish message to PubSub, I get this error. Look at the code...
Try {
val topicName = TopicName.of(projectName, pubSubTopicName)
val scope = new ArrayList[String]()
scope.add("https://www.googleapis.com/auth/pubsub")
val googleCredentials = GoogleCredentials
.fromStream(getClass.getResourceAsStream("file path")
.createScoped(scope)
val batchingSettings = BatchingSettings
.newBuilder()
.setElementCountThreshold(elementCountThreshold)
.setRequestByteThreshold(requestByteThreshold)
.setDelayThreshold(delayDuration)
.build()
val publisher = getPublisher(
topicName,
batchingSettings,
googleCredentials
)
val publishedData: MutableList[String] = MutableList()
for (pubMessage <- dataToBePublished) {
val pubSubMessage =
getPubSubMessage(
ByteString.copyFromUtf8(pubMessage)
)
val messageIdFuture = publisher.publish(pubSubMessage)
publishedData.+=(messageIdFuture.get)
}
}
def getPublisher(
topicName: TopicName,
batchingSettings: BatchingSettings,
googleCredentials: GoogleCredentials
): Publisher = {
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setBatchingSettings(batchingSettings)
.build()
}
def getPubSubMessage( data: ByteString ): PubsubMessage = {
PubsubMessage
.newBuilder()
.setData(data)
.build()
}
As it shows channel error, i tried the below change in Publisher but same error
Publisher
.newBuilder(topicName)
.setCredentialsProvider(
FixedCredentialsProvider.create(googleCredentials)
)
.setChannelProvider(
TopicAdminSettings
.defaultGrpcTransportProviderBuilder()
.build()
)
.build()
I also tried to add dependencies in sbt but still same error
"com.google.cloud" % "google-cloud-pubsub" % "1.120.19",
"io.grpc" % "grpc-okhttp" % "1.49.2",
"io.grpc" % "grpc-netty" % "1.49.2"
All three suggested dependencies are there in libraries, still error.
Please help for this issue, thanks in advance.
So the issue is in assembling fat jar because of pubsub library.
Here are the changes required in build.sbt
"io.grpc" % "grpc-netty" % "1.49.2"
assemblyShadeRules in assembly := Seq(
ShadeRule
.rename("com.google.common.**" -> "repackaged.com.google.common.@1")
.inAll,
ShadeRule
.rename("com.google.protobuf.**" -> "repackaged.com.google.protobuf.@1")
.inAll,
)
assemblyMergeStrategy in assembly := {
case x if Assembly.isConfigFile(x) =>
MergeStrategy.concat
case PathList(ps @ _*) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) =>
MergeStrategy.rename
case PathList("META-INF", xs @ _*) =>
(xs map { _.toLowerCase }) match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) =>
MergeStrategy.discard
case ps @ (x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") =>
MergeStrategy.discard
case "plexus" :: xs =>
MergeStrategy.discard
case "services" :: xs =>
MergeStrategy.filterDistinctLines
case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) =>
MergeStrategy.filterDistinctLines
case _ => MergeStrategy.first
}
case _ => MergeStrategy.first
}
This will work without runtime error.