apache-spark-sqlspark-java

Spark Dataframe MapType filter on Keys and Values


I am using spark 2.4 & Java.

I am playing around with MapTypes in spark.

Consider the following code

public class MapTypes {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
        Dataset<Row> df = spark.createDataFrame(getData(), getSchema())
                .groupBy("name")
                .agg(map_from_arrays(collect_list("make"), collect_list(struct("yr", "colour"))).as("cars"));
        
        
        df.show(false); // output 1
        
        df.select(col("name"),
                when(col("cars").getItem("BMW").getItem("colour").equalTo("white"), 1).otherwise(0).as("hasWhiteBMW"),
                when(col("cars").getItem("BMW").getItem("colour").equalTo("black"), 1).otherwise(0).as("hasBlackBMW"),
                when(col("cars").getItem("Toyota").getItem("colour").equalTo("white"), 1).otherwise(0).as("hasWhiteToyota"))
            .show(); //output 2
    }
    
    static List<Row> getData() {
        List<Row> data = new ArrayList<>();
        data.add(RowFactory.create("John", "BMW", "2020", "white"));
        data.add(RowFactory.create("John", "BMW", "2009", "black"));
        data.add(RowFactory.create("John", "Ford", "2021", "red"));
        data.add(RowFactory.create("Peter", "BMW", "2019", "red"));
        data.add(RowFactory.create("Peter", "Toyota", "2020", "white"));
        data.add(RowFactory.create("Ben", "BMW", "2021", "white"));
        return data;
    }
    
    static StructType getSchema() {
        return DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("make", DataTypes.StringType, true),
                DataTypes.createStructField("yr", DataTypes.StringType, true),
                DataTypes.createStructField("colour", DataTypes.StringType, true)
                ));
    }
}

Output 1

+-----+-----------------------------------------------------------------+
|name |cars                                                             |
+-----+-----------------------------------------------------------------+
|John |[BMW -> [2020, white], BMW -> [2009, black], Ford -> [2021, red]]|
|Ben  |[BMW -> [2021, white]]                                           |
|Peter|[BMW -> [2019, red], Toyota -> [2020, white]]                    |
+-----+-----------------------------------------------------------------+

In output 1 John has two keys for "BMW", how do I get only one key with two values in a list like below

+-----+-----------------------------------------------------------------+
|name |cars                                                             |
+-----+-----------------------------------------------------------------+
|John |[BMW -> [[2020, white], [2009, black]], Ford -> [2021, red]]|
|Ben  |[BMW -> [2021, white]]                                           |
|Peter|[BMW -> [2019, red], Toyota -> [2020, white]]                    |
+-----+-----------------------------------------------------------------+

Output 2

+-----+-----------+-----------+--------------+
| name|hasWhiteBMW|hasBlackBMW|hasWhiteToyota|
+-----+-----------+-----------+--------------+
| John|          1|          0|             0|
|  Ben|          1|          0|             0|
|Peter|          0|          0|             1|
+-----+-----------+-----------+--------------+

John clearly has a black BMW, how do I fix this?

Any tips greatly appreciated.


Solution

  • Since you're expecting a nested list, I think you should run groupby twice. See below (I'm using Python/PySpark not Java, but the syntax is basically the same)

    The first groupby is to collect all yr and colors

    df2 = (df
        .groupBy("name", "make")
        .agg(F.collect_list(F.struct("yr", "colour")).alias("yr_colour"))
    )
    
    # Output
    # +-----+------+------------------------------+
    # |name |make  |yr_colour                     |
    # +-----+------+------------------------------+
    # |Peter|Toyota|[{2020, white}]               |
    # |Peter|BMW   |[{2019, red}]                 |
    # |Ben  |BMW   |[{2021, white}]               |
    # |John |BMW   |[{2020, white}, {2009, black}]|
    # |John |Ford  |[{2021, red}]                 |
    # +-----+------+------------------------------+
    

    Looking at the data, it's sort of what we're expecting. Now all we have to do is run the second groupby to collect all yr_colour. You didn't tell the exact output format so I'm showing two options here

    Option #1: making car a MAP

    (df2
        .groupBy("name")
        .agg(F.map_from_arrays(F.collect_list("make"), F.collect_list("yr_colour")).alias("car"))
    )
    
    # Output
    # +-----+--------------------------------------------------------------+
    # |name |car                                                           |
    # +-----+--------------------------------------------------------------+
    # |John |{BMW -> [{2020, white}, {2009, black}], Ford -> [{2021, red}]}|
    # |Ben  |{BMW -> [{2021, white}]}                                      |
    # |Peter|{Toyota -> [{2020, white}], BMW -> [{2019, red}]}             |
    # +-----+--------------------------------------------------------------+
    
    # Schema
    # root
    #  |-- name: string (nullable = true)
    #  |-- car: map (nullable = false)
    #  |    |-- key: string
    #  |    |-- value: array (valueContainsNull = false)
    #  |    |    |-- element: struct (containsNull = false)
    #  |    |    |    |-- yr: string (nullable = true)
    #  |    |    |    |-- colour: string (nullable = true)
    

    Option #2: making car a LIST

    (df2
        .groupBy("name")
        .agg(F.collect_list(F.expr("map(make, yr_colour)")).alias("car"))
        .printSchema()
    )
    # Output
    # +-----+------------------------------------------------------------------+
    # |name |car                                                               |
    # +-----+------------------------------------------------------------------+
    # |John |[{BMW -> [{2020, white}, {2009, black}]}, {Ford -> [{2021, red}]}]|
    # |Ben  |[{BMW -> [{2021, white}]}]                                        |
    # |Peter|[{Toyota -> [{2020, white}]}, {BMW -> [{2019, red}]}]             |
    # +-----+------------------------------------------------------------------+
    
    # Schema
    # root
    #  |-- name: string (nullable = true)
    #  |-- car: array (nullable = false)
    #  |    |-- element: map (containsNull = false)
    #  |    |    |-- key: string
    #  |    |    |-- value: array (valueContainsNull = false)
    #  |    |    |    |-- element: struct (containsNull = false)
    #  |    |    |    |    |-- yr: string (nullable = true)
    #  |    |    |    |    |-- colour: string (nullable = true)