I am using the all-spark notebook docker image on my mac to use Apache toree and scala (https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook ).
I am trying to test the basic streaming example of spark's documentation, which involve to:
nc -lk 9999
So I launch the container binding the 9999 port:
$ sudo docker run -it --rm -p 9999:9999 -p 8888:8888 -e GRANT_SUDO=yes --user root --pid=host -e TINI_SUBREAPER=true -v $HOME/Informatique/notebooks:/home/jovyan/work:rw jupyter/all-spark-notebook
But then, trying to connect to it I got a "port already used" error:
$ nc -lk 9999
nc: Address already in use
I also tried to put myself in the container:
romain@MacBook-Pro-de-oursin:~$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0bd6b70bacfa jupyter/all-spark-notebook "tini -- start-not..." 23 seconds ago Up 22 seconds 0.0.0.0:8888->8888/tcp, 0.0.0.0:9999->9999/tcp wonderful_brattain
romain@MacBook-Pro-de-oursin:~$ docker exec -ti wonderful_brattain /bin/bash
root@0bd6b70bacfa:~/work# nc -lk 9999
bash: nc: command not found
root@0bd6b70bacfa:~/work# sudo apt-get update
root@0bd6b70bacfa:~/work# sudo apt-get install netcat-traditional
root@0bd6b70bacfa:~/work# nc -lk 9999
aaaa aaa aaa
bb bbb bbb
cc cc cc
But there is nothing displayed on the scala notebook:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
with :
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
gives :
-------------------------------------------
Time: 1485880101000 ms
-------------------------------------------
-------------------------------------------
Time: 1485880102000 ms
-------------------------------------------
-------------------------------------------
Time: 1485880103000 ms
-------------------------------------------
-------------------------------------------
Time: 1485880104000 ms
-------------------------------------------
How to deal with these network issues?
The Spark streaming context tries to connect to the nc
server to stream data from it. It's the nc
server that is listening on port 9999, not the Spark context.
You're getting the port already in use error because you first start your notebook container with -p 9999:9999
, so Docker reserves port 9999 on your host. When you then try to run nc -lk 9999
on your host, you get the conflict.
You need to setup the nc
server so that the kernel running within your notebook container can access it. One way to do this is to run the nc
server in a separate Docker container, and have both containers connect to the same Docker network.
First, create a Docker network on your host to allow the two containers to communicate:
docker network create testnet
Now run nc
in its own container.
docker run -it --rm --name nc --network testnet appropriate/nc -lk 9999
The --network testnet
option attaches the container to the testnet
network. The --name nc
option makes the container accessible to other containers on the same network using the hostname nc
.
Now run the notebook container separately. It should also use --network testnet
.
docker run -it --rm --network testnet -p 8888:8888 \
-v $HOME/Informatique/notebooks:/home/jovyan/work:rw \
jupyter/all-spark-notebook
Finally, in your notebook code, make sure the Spark context connects to hostname nc
.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("nc", 9999)
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
If you type in the terminal for the nc
container:
hello, world
You should see this in your notebook:
-------------------------------------------
Time: 1485987495000 ms
-------------------------------------------
(hello,,1)
(world,1)