scalaibm-cloudspark-streamingmessage-hub

Kafka java.io.EOFException - NetworkReceive.readFromReadableChannel


I'm trying to connect to IBM Message Hub from Apache Spark 2.2.1 Structured Streaming.

The connection code is quite basic:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

import spark.implicits._

val df = spark.readStream.
                format("kafka").
                option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                option("subscribe", "transactions_load").
                option("security.protocol", "SASL_SSL").
                option("sasl.mechanism", "PLAIN").
                option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"*****\" password=\"*****\";").
                option("ssl.protocol", "TLSv1.2").
                option("ssl.enabled.protocols", "TLSv1.2").
                option("ssl.endpoint.identification.algorithm", "HTTPS").
                option("auto.offset.reset","earliest").
                option("group.id", System.currentTimeMillis).
                load()

val query = df.writeStream.format("console").start()

I'm starting the spark shell with:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

However, I'm getting a disconnected error:

scala> 18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
<<repeats forever>>

I've increased debug output with sc.setLogLevel("DEBUG") and I get:

<<log output ommitted for brevity>>
18/01/09 08:38:28 DEBUG SessionState: SessionState user: null
18/01/09 08:38:28 DEBUG SessionState: HDFS root scratch dir: /tmp/hive with schema null, permission: rwx-wx-wx
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9_resources
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9/_tmp_space.db
18/01/09 08:38:28 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/home/snowch/spark-2.2.1-bin-hadoop2.7/spark-warehouse
18/01/09 08:38:28 DEBUG SessionState: Session is using authorization class class org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider
18/01/09 08:38:28 DEBUG StateStoreCoordinatorRef: Retrieved existing StateStoreCoordinator endpoint
18/01/09 08:38:28 DEBUG StreamExecution: Starting Trigger Calculation
18/01/09 08:38:28 INFO StreamExecution: Starting new streaming query.
18/01/09 08:38:28 DEBUG UserGroupInformation: PrivilegedAction as:snowch (auth:SIMPLE) from:org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
18/01/09 08:38:28 DEBUG KafkaSource$$anon$1: Unable to find batch /tmp/temporary-fb3e6e1a-fbbe-4098-991c-0b29f63ecade/sources/0/0
18/01/09 08:38:28 DEBUG AbstractCoordinator: Sending coordinator request for group spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0 to broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 (id: -5 rack: null)
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -5 at kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -3 at kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -5
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -3
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -5
18/01/09 08:38:28 DEBUG Selector: Connection with kafka05-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.153 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:174)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:263)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:261)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:230)
    at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:171)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:132)
    at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:129)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:129)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:97)
    at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:163)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:520)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:518)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:518)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:492)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
18/01/09 08:38:28 DEBUG NetworkClient: Node -5 disconnected.
18/01/09 08:38:28 WARN NetworkClient: Bootstrap broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:38:28 DEBUG ConsumerNetworkClient: Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4eacac4e, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0}), createdTimeMs=1515487108479, sendTimeMs=1515487108598) with correlation id 0 due to node -5 being disconnected
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -3
18/01/09 08:38:28 DEBUG Selector: Connection with kafka01-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.149 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    <<repeated>>

I have seen the following similar questions:

I understand that some of the output is just 'noise', however, my spark streaming application does not appear to be receiving any data. I have connected with a console client and I am able to see data.


Update 1 - I've tried configuring JaaS, but still getting the same error. The issue may be that the JaaS code needs to run on each worker node, but isn't getting run on them.

sc.setLogLevel("DEBUG")

def jaasClientConfig(username: String, password: String): Unit = {

    import javax.security.auth.login.AppConfigurationEntry
    import javax.security.auth.login.Configuration
    import javax.security.auth.login.LoginException

    import scala.collection.JavaConversions._

    System.setProperty("java.security.auth.login.config", "")

    Configuration.setConfiguration(new Configuration() {
        def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
            val idMap = Map(
                "serviceName" -> "kafka",
                "username" -> username,
                "password" -> password
                )
            val ace = new AppConfigurationEntry(
               "org.apache.kafka.common.security.plain.PlainLoginModule",
               AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
               idMap
            )
            return Array(ace)
        }
    })
}

def startSparkStreaming(): Unit = {

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.SparkSession

    val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()

    import spark.implicits._

    val df = spark.readStream.
                    format("kafka").
                    option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                    option("subscribe", "transactions_load").
                    option("security.protocol", "SASL_SSL").
                    option("sasl.mechanism", "PLAIN").
                    option("ssl.protocol", "TLSv1.2").
                    option("ssl.enabled.protocols", "TLSv1.2").
                    option("ssl.endpoint.identification.algorithm", "HTTPS").
                    option("auto.offset.reset","earliest").
                    option("group.id", System.currentTimeMillis).
                    load()

    val query = df.writeStream.format("console").start()
}

jaasClientConfig("****","****")

startSparkStreaming()

Update 2

I've also tried with a jaas.conf:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.conf=jaas.conf" --files "jaas.conf"

and ...

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="*****"
    password="*****";
    }

Still the same problem ...


Solution

  • First I needed to run spark-shell with --conf that points the executor and driver to my jaas.conf:

       ./bin/spark-shell --master local[1] \
            --jars external/kafka-0-10-sql/target/spark-sql-kafka-0-10_2.11-2.2.2-SNAPSHOT.jar,external/kafka-0-10-assembly/target/spark-streaming-kafka-0-10-assembly_2.11-2.2.2-SNAPSHOT.jar \
           --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
           --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" \
           --num-executors 1  --executor-cores 1 
    

    Next I had to add some kafka options:

    val df = spark.readStream.
                    format("kafka").
                    option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
                    option("subscribe", "transactions_load").
                    option("kafka.security.protocol", "SASL_SSL").
                    option("kafka.sasl.mechanism", "PLAIN").
                    option("kafka.ssl.protocol", "TLSv1.2").
                    option("kafka.ssl.enabled.protocols", "TLSv1.2").
                    load()
    

    Note that the kafka options need to be prefixed with kafka., for example:

    These changes solved the connectivity issue for me.