apache-sparkscala-spark

How Spark broadcast the data in Broadcast Join


How Spark broadcast the data when we use Broadcast Join with hint - As I can see when we use the broadcast hint: It calls this function

 def broadcast[T](df: Dataset[T]): Dataset[T] = {
    Dataset[T](df.sparkSession,
      ResolvedHint(df.logicalPlan, HintInfo(strategy = Some(BROADCAST))))(df.exprEnc)
  }

Which internally calls the apply method of dataset & set the logicalPlan using ResolvedHint

val dataset = new Dataset(sparkSession, logicalPlan, implicitly[Encoder[T]])

But what is after this. How this actually work, where is code written for that.

  1. What if we have multiple partitions of small dataset (which we are going to broadcast), does spark combine all partitions & then broadcast?
  2. Does it broadcast to driver first & then it goes executors.
  3. What is BitTorrent.

Solution

  • Regarding 1 & 2 During broadcast join data are collected on driver and what is going on later depends on join algorith

    For BroadcastHashJoin(BHJ) driver builds hashtable and then this table is distributed to executors

    For BroadcastNestedLoops broadcasted dataset is distributed as array to executors

    So as you can see initial structure is not kept here and whole broadcasted dataset needs to fit into driver's memory (otherwise job will fail with oom error on driver)

    Regarding 3 what exactly do you want to know?

    In spark there is TorrentBroadcast which is BitTorrent-like implementation of broadcast. I don't know much about it (i never had to dig so deep), but if you want to know more i think that you can start here:

    TorrentBroadcast docu

    TorrentBroadcast source code

    HttpBroadcast docu - its other broadcast algorithm