scalaapache-sparkdataframeapache-spark-sqlpartitioning

How to define partitioning of DataFrame?


I've started using Spark SQL and DataFrames in Spark 1.4.0. I'm wanting to define a custom partitioner on DataFrames, in Scala, but not seeing how to do this.

One of the data tables I'm working with contains a list of transactions, by account, silimar to the following example.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

At least initially, most of the calculations will occur between the transactions within an account. So I would want to have the data partitioned so that all of the transactions for an account are in the same Spark partition.

But I'm not seeing a way to define this. The DataFrame class has a method called 'repartition(Int)', where you can specify the number of partitions to create. But I'm not seeing any method available to define a custom partitioner for a DataFrame, such as can be specified for an RDD.

The source data is stored in Parquet. I did see that when writing a DataFrame to Parquet, you can specify a column to partition by, so presumably I could tell Parquet to partition it's data by the 'Account' column. But there could be millions of accounts, and if I'm understanding Parquet correctly, it would create a distinct directory for each Account, so that didn't sound like a reasonable solution.

Is there a way to get Spark to partition this DataFrame so that all data for an Account is in the same partition?


Solution

  • Spark >= 2.3.0

    SPARK-22614 exposes range partitioning.

    val partitionedByRange = df.repartitionByRange(42, $"k")
    
    partitionedByRange.explain
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k ASC NULLS FIRST], 42
    // +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
    // 
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- Project [_1#2 AS k#5, _2#3 AS v#6]
    //    +- LocalRelation [_1#2, _2#3]
    // 
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#5 ASC NULLS FIRST], 42
    // +- LocalRelation [k#5, v#6]
    // 
    // == Physical Plan ==
    // Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
    // +- LocalTableScan [k#5, v#6]
    

    SPARK-22389 exposes external format partitioning in the Data Source API v2.

    Spark >= 1.6.0

    In Spark >= 1.6 it is possible to use partitioning by column for query and caching. See: SPARK-11410 and SPARK-4849 using repartition method:

    val df = Seq(
      ("A", 1), ("B", 2), ("A", 3), ("C", 1)
    ).toDF("k", "v")
    
    val partitioned = df.repartition($"k")
    partitioned.explain
    
    // scala> df.repartition($"k").explain(true)
    // == Parsed Logical Plan ==
    // 'RepartitionByExpression ['k], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Analyzed Logical Plan ==
    // k: string, v: int
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Optimized Logical Plan ==
    // RepartitionByExpression [k#7], None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
    // 
    // == Physical Plan ==
    // TungstenExchange hashpartitioning(k#7,200), None
    // +- Project [_1#5 AS k#7,_2#6 AS v#8]
    //    +- Scan PhysicalRDD[_1#5,_2#6]
    

    Unlike RDDs Spark Dataset (including Dataset[Row] a.k.a DataFrame) cannot use custom partitioner as for now. You can typically address that by creating an artificial partitioning column but it won't give you the same flexibility.

    Spark < 1.6.0:

    One thing you can do is to pre-partition input data before you create a DataFrame

    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    import org.apache.spark.HashPartitioner
    
    val schema = StructType(Seq(
      StructField("x", StringType, false),
      StructField("y", LongType, false),
      StructField("z", DoubleType, false)
    ))
    
    val rdd = sc.parallelize(Seq(
      Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
      Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
    ))
    
    val partitioner = new HashPartitioner(5) 
    
    val partitioned = rdd.map(r => (r.getString(0), r))
      .partitionBy(partitioner)
      .values
    
    val df = sqlContext.createDataFrame(partitioned, schema)
    

    Since DataFrame creation from an RDD requires only a simple map phase existing partition layout should be preserved*:

    assert(df.rdd.partitions == partitioned.partitions)
    

    The same way you can repartition existing DataFrame:

    sqlContext.createDataFrame(
      df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
      df.schema
    )
    

    So it looks like it is not impossible. The question remains if it make sense at all. I will argue that most of the time it doesn't:

    1. Repartitioning is an expensive process. In a typical scenario most of the data has to be serialized, shuffled and deserialized. From the other hand number of operations which can benefit from a pre-partitioned data is relatively small and is further limited if internal API is not designed to leverage this property.

      • joins in some scenarios, but it would require an internal support,
      • window functions calls with matching partitioner. Same as above, limited to a single window definition. It is already partitioned internally though, so pre-partitioning may be redundant,
      • simple aggregations with GROUP BY - it is possible to reduce memory footprint of the temporary buffers**, but overall cost is much higher. More or less equivalent to groupByKey.mapValues(_.reduce) (current behavior) vs reduceByKey (pre-partitioning). Unlikely to be useful in practice.
      • data compression with SqlContext.cacheTable. Since it looks like it is using run length encoding, applying OrderedRDDFunctions.repartitionAndSortWithinPartitions could improve compression ratio.
    2. Performance is highly dependent on a distribution of the keys. If it is skewed it will result in a suboptimal resource utilization. In the worst case scenario it will be impossible to finish the job at all.

    3. A whole point of using a high level declarative API is to isolate yourself from a low level implementation details. As already mentioned by @dwysakowicz and @RomiKuntsman an optimization is a job of the Catalyst Optimizer. It is a pretty sophisticated beast and I really doubt you can easily improve on that without diving much deeper into its internals.

    Related concepts

    Partitioning with JDBC sources:

    JDBC data sources support predicates argument. It can be used as follows:

    sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
    

    It creates a single JDBC partition per predicate. Keep in mind that if sets created using individual predicates are not disjoint you'll see duplicates in the resulting table.

    partitionBy method in DataFrameWriter:

    Spark DataFrameWriter provides partitionBy method which can be used to "partition" data on write. It separates data on write using provided set of columns

    val df = Seq(
      ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
    ).toDF("k", "v")
    
    df.write.partitionBy("k").json("/tmp/foo.json")
    

    This enables predicate push down on read for queries based on key:

    val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
    df1.where($"k" === "bar")
    

    but it is not equivalent to DataFrame.repartition. In particular aggregations like:

    val cnts = df1.groupBy($"k").sum()
    

    will still require TungstenExchange:

    cnts.explain
    
    // == Physical Plan ==
    // TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
    // +- TungstenExchange hashpartitioning(k#90,200), None
    //    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
    //       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
    

    bucketBy method in DataFrameWriter (Spark >= 2.0):

    bucketBy has similar applications as partitionBy but it is available only for tables (saveAsTable). Bucketing information can used to optimize joins:

    // Temporarily disable broadcast joins
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    df.write.bucketBy(42, "k").saveAsTable("df1")
    val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
    df2.write.bucketBy(42, "k").saveAsTable("df2")
    
    // == Physical Plan ==
    // *Project [k#41, v#42, v2#47]
    // +- *SortMergeJoin [k#41], [k#46], Inner
    //    :- *Sort [k#41 ASC NULLS FIRST], false, 0
    //    :  +- *Project [k#41, v#42]
    //    :     +- *Filter isnotnull(k#41)
    //    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
    //    +- *Sort [k#46 ASC NULLS FIRST], false, 0
    //       +- *Project [k#46, v2#47]
    //          +- *Filter isnotnull(k#46)
    //             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
    

    * By partition layout I mean only a data distribution. partitioned RDD has no longer a partitioner. ** Assuming no early projection. If aggregation covers only small subset of columns there is probably no gain whatsoever.