apache-sparkoptimizationpartitioningdenormalization

Optimizing Spark Joins: How to Perform Multiple Left Joins Without Additional Shuffling


Let's say I have a 2 DataFrames:

First is tickets which has 3 columns:

And I have a second DataFrame users with 3 columns:

I would like as an output a single DataFrame:

Both the assignee and requester of a ticket are instances of an user. So basically my initial code is:

tickets
    .as("ticket")
    .join(
        users.as("assignee"),
        $"ticket.account_id" === $"assignee.account_id" && "ticket.assignee_id" === "assignee.id",
        "left_outer"
    )
    .join(
        users.as("requester"),
        $"ticket.account_id" === $"requester.account_id" && $"ticket.requester_id" === $"requester.id",
        "left_outer"
    )
    .select(
        $"tickets.*",
        $"assignee.name".as("assignee_name"),
        $"requester.name".as("requester_name")
    )

This result in a plan where I shuffle tickets and users data twice, once for the first join and a second time for the second join (expected).

Now I'm wondering if there is some kind of smart way to avoid this double shuffle.

Some abstract idea I had was to somehow have tickets partitioned in some way and achieve to broadcast each individual users such as if user.id is equal to assignee_id or requester_id it is sent to a ticket's partition (a single user might be sent to multiple partitions) and then, locally, in each partition we would perform the two left joins. This way we would shuffle each DataFrame only once, then we would denormalize the data.

The more I'm thinking about this problem, the less I'm thinking this is possible without going too deep in Spark custom logic (ideally would be nice to be able to achieve this with Spark DataFrame API).

I believe the solution must bring-in a custom partitioner because an HashPartitioner can't work here as we won't be able to deduce the partition IDs for a given user if tickets are partitioned using hash(assignee_id, requester_id) (hash function not being distributive). On the other hand, from what I saw partitioners map 1 row to 1 partition and as said previously, I would like to send a single user to multiple partitions if necessary.

If you believe this is an impossible problem then tell me, I'm fine with it. I'll continue searching though, I just found it was an interesting optimization topic.

EDIT:

Here is the idea I have: Let's call p the number of partitions I want to repartition my tickets to. I will repartition my tickets using the assignee_id and requester_id. I'm splitting my p partitions in sqrt(p) subsections and each subsection therefore contains also sqrt(p) partitions. I'll use hash(assignee_id) % sqrt(p) to identify the subsection the ticket belongs to then hash(requester_id) % sqrt(p) to identify the which partition in the subsection the ticket belongs to.

E.g. if p = 9, and hash(x) = x, tickets where assignee_id % 3 = 1 will be in subsection 1 which contains partitions from 3 to 5. Then if requester_id % 3 = 2 then the ticket go to partition 5.

(I consider partitions being 0-based indexed)

Visualization of the partitions when p=9

When it comes to users, as I'm going to use a custom partitioner and that a single user needs to go in multiple partitions while a partitioner requires a single target partition per row, I'll need to explode my users so I get multiple rows for a single user to dispatch to multiple partitions.

How much copies of an user do we need? We need 1 copy per subsection (for the single partition in the subsection where the user matches the condition of the requester) and 1 copy per partition in the subsection where the user matches the condition for the assignee. You can refer to the example I shared, when an user matches a color code it must go in all partitions that match the same color code.

It gives numberOfCopies = (1 * sqrt(p)) + sqrt(p) - 1 = 2 * sqrt(p) - 1. The "minus 1" is required because we are accounting twice for the partition where the user matches both the conditions for the assignee and the requester.

With this repartition, we would have all tickets and users cogrouped in the same partition for operation the 2 left joins without needing to shuffle data.

Compared to a broadcast where each user with being broadcast to p partitions, we would be "broadcasting" each user to 2.sqrt(p) partitions (let's consider 1 negligeable).

The pseudo-code for the partioners would be:

def partitionTicket(ticket: Ticket, p: Int): Int = {
    val maxSubP = Math.floor(Math.sqrt(p)).toInt
    val assigneeP = hash(ticket.assignee_id) % maxSubP
    val requesterP = hash(ticket.requester_id) % maxSubP
    
    assigneeP * maxSubP + requesterP
}
// When we explode users, we add a target_p column which is the index of the user copy from 0 to (2.sqrt(p) - 1)
def partitionUser(user: User, p: Int): Int = {
    val maxSubP = Math.floor(Math.sqrt(p)).toInt
    val assigneeSectionIndex = hash(user.id) % maxSubP
     
    if (user.target_p >= assigneeSectionIndex && user.target_p < assigneeSectionIndex + maxSubP) {
        return assigneeSectionIndex * maxSubP + user.target_p % maxSubP
    } else {
        val correctedTargetP = if (user.target_p < assigneeSectionIndex) {
            user.target_p
        } else {
            user.target_p - maxSubP + 1
        }
        return correctedTargetP * maxSubP + assigneeSectionIndex
    }
}

Hopefully I didn't make any mistake in my calculus. Next step is to try making the associated Spark code.


Solution

  • Posting my own answer as I achieved to do what I was looking for (full code).

    The transformation code looks like the following:

    val usersRdd = sc.parallelize(users)
    val ticketsRdd = sc.parallelize(tickets)
    val partitioner = new DoubleDenormalizationPartitioner(p)
    
    // Explosion factor as explained in the EDIT of my question.
    val usersExplosionFactor = (2 * Math.floor(Math.sqrt(p)) - 1).toInt
    
    val explodedUsers = usersRdd
      .flatMap { user =>
        (0 until usersExplosionFactor)
          .map(targetP => (user, -targetP))
      }
    
    // Below we're partitioning each RDD using our custom partitioner (see full code for implementation)
    
    val repartitionedUsers = explodedUsers
      .keyBy { case (user, targetP) => (user.id, targetP) }
      .partitionBy(partitioner)
    
    val repartitionedTickets = ticketsRdd
      .keyBy(ticket => (ticket.assigneeId, ticket.requesterId))
      .partitionBy(partitioner)
    
    val denormalizedTickets = repartitionedTickets
      .map(_._2)
      .zipPartitions(repartitionedUsers.map(_._2._1), preservesPartitioning = true) { case (tickets, usersI) =>
        // Here, thanks to the map we can denormalize the assignee and requester at the same time
        val users = usersI.map(u => (u.accountId, u.id) -> u.name).toMap
        tickets.map { ticket =>
          (
            ticket,
            users.get(ticket.accountId, ticket.assigneeId),
            users.get(ticket.accountId, ticket.requesterId)
          )
        }
      }
      .mapPartitions(_.map { case (ticket, assignee, requester) =>
        (ticket.accountId, ticket.id, assignee, requester)
      })
    

    I tested the performance of my solution compared to Dataframe joins and RDD joins, not working so smoothly. Overall I imagine that the advice "do not use RDDs unless you really what you're doing" applies here (I don't really know what I'm doing here, first time really using the RDDs in an "advanced" way).

    I hope it could still help someone or that at least someone found this problem interesting (I did).