scalaapache-sparkspark-streaming

Scala Spark Streaming Via Apache Toree


I am using the all-spark notebook docker image on my mac to use Apache toree and scala (https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook ).

I am trying to test the basic streaming example of spark's documentation, which involve to:

  1. start a sparkstreaming object, listening on port 9999
  2. start the netcast program : nc -lk 9999

So I launch the container binding the 9999 port:

$ sudo docker run -it --rm -p 9999:9999 -p 8888:8888 -e GRANT_SUDO=yes --user root --pid=host -e TINI_SUBREAPER=true  -v $HOME/Informatique/notebooks:/home/jovyan/work:rw jupyter/all-spark-notebook

But then, trying to connect to it I got a "port already used" error:

$ nc -lk 9999
nc: Address already in use

I also tried to put myself in the container:

romain@MacBook-Pro-de-oursin:~$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                                            NAMES
0bd6b70bacfa        jupyter/all-spark-notebook   "tini -- start-not..."   23 seconds ago      Up 22 seconds       0.0.0.0:8888->8888/tcp, 0.0.0.0:9999->9999/tcp   wonderful_brattain
romain@MacBook-Pro-de-oursin:~$ docker exec -ti wonderful_brattain  /bin/bash
root@0bd6b70bacfa:~/work# nc -lk 9999
bash: nc: command not found
root@0bd6b70bacfa:~/work# sudo apt-get update
root@0bd6b70bacfa:~/work# sudo apt-get install netcat-traditional
root@0bd6b70bacfa:~/work# nc -lk 9999
aaaa aaa aaa
bb bbb bbb
cc cc cc

But there is nothing displayed on the scala notebook:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

with :

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

gives :

-------------------------------------------
Time: 1485880101000 ms
-------------------------------------------

-------------------------------------------
Time: 1485880102000 ms
-------------------------------------------

-------------------------------------------
Time: 1485880103000 ms
-------------------------------------------

-------------------------------------------
Time: 1485880104000 ms
-------------------------------------------

How to deal with these network issues?


Solution

  • The Spark streaming context tries to connect to the nc server to stream data from it. It's the nc server that is listening on port 9999, not the Spark context.

    You're getting the port already in use error because you first start your notebook container with -p 9999:9999, so Docker reserves port 9999 on your host. When you then try to run nc -lk 9999 on your host, you get the conflict.

    You need to setup the nc server so that the kernel running within your notebook container can access it. One way to do this is to run the nc server in a separate Docker container, and have both containers connect to the same Docker network.

    First, create a Docker network on your host to allow the two containers to communicate:

    docker network create testnet
    

    Now run nc in its own container.

    docker run -it --rm --name nc --network testnet appropriate/nc -lk 9999
    

    The --network testnet option attaches the container to the testnet network. The --name nc option makes the container accessible to other containers on the same network using the hostname nc.

    Now run the notebook container separately. It should also use --network testnet.

    docker run -it --rm --network testnet -p 8888:8888 \
    -v $HOME/Informatique/notebooks:/home/jovyan/work:rw \
    jupyter/all-spark-notebook
    

    Finally, in your notebook code, make sure the Spark context connects to hostname nc.

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._ 
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sc, Seconds(5))
    val lines = ssc.socketTextStream("nc", 9999)
    val words = lines.flatMap(_.split(" "))
    import org.apache.spark.streaming.StreamingContext._ 
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
    
    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    

    If you type in the terminal for the nc container:

    hello, world
    

    You should see this in your notebook:

    -------------------------------------------
    Time: 1485987495000 ms
    -------------------------------------------
    (hello,,1)
    (world,1)