I am using spark 2.4 & Java.
I am playing around with MapTypes in spark.
Consider the following code
public class MapTypes {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
Dataset<Row> df = spark.createDataFrame(getData(), getSchema())
.groupBy("name")
.agg(map_from_arrays(collect_list("make"), collect_list(struct("yr", "colour"))).as("cars"));
df.show(false); // output 1
df.select(col("name"),
when(col("cars").getItem("BMW").getItem("colour").equalTo("white"), 1).otherwise(0).as("hasWhiteBMW"),
when(col("cars").getItem("BMW").getItem("colour").equalTo("black"), 1).otherwise(0).as("hasBlackBMW"),
when(col("cars").getItem("Toyota").getItem("colour").equalTo("white"), 1).otherwise(0).as("hasWhiteToyota"))
.show(); //output 2
}
static List<Row> getData() {
List<Row> data = new ArrayList<>();
data.add(RowFactory.create("John", "BMW", "2020", "white"));
data.add(RowFactory.create("John", "BMW", "2009", "black"));
data.add(RowFactory.create("John", "Ford", "2021", "red"));
data.add(RowFactory.create("Peter", "BMW", "2019", "red"));
data.add(RowFactory.create("Peter", "Toyota", "2020", "white"));
data.add(RowFactory.create("Ben", "BMW", "2021", "white"));
return data;
}
static StructType getSchema() {
return DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("make", DataTypes.StringType, true),
DataTypes.createStructField("yr", DataTypes.StringType, true),
DataTypes.createStructField("colour", DataTypes.StringType, true)
));
}
}
Output 1
+-----+-----------------------------------------------------------------+
|name |cars |
+-----+-----------------------------------------------------------------+
|John |[BMW -> [2020, white], BMW -> [2009, black], Ford -> [2021, red]]|
|Ben |[BMW -> [2021, white]] |
|Peter|[BMW -> [2019, red], Toyota -> [2020, white]] |
+-----+-----------------------------------------------------------------+
In output 1 John has two keys for "BMW", how do I get only one key with two values in a list like below
+-----+-----------------------------------------------------------------+
|name |cars |
+-----+-----------------------------------------------------------------+
|John |[BMW -> [[2020, white], [2009, black]], Ford -> [2021, red]]|
|Ben |[BMW -> [2021, white]] |
|Peter|[BMW -> [2019, red], Toyota -> [2020, white]] |
+-----+-----------------------------------------------------------------+
Output 2
+-----+-----------+-----------+--------------+
| name|hasWhiteBMW|hasBlackBMW|hasWhiteToyota|
+-----+-----------+-----------+--------------+
| John| 1| 0| 0|
| Ben| 1| 0| 0|
|Peter| 0| 0| 1|
+-----+-----------+-----------+--------------+
John clearly has a black BMW, how do I fix this?
Any tips greatly appreciated.
Since you're expecting a nested list, I think you should run groupby
twice. See below (I'm using Python/PySpark not Java, but the syntax is basically the same)
The first groupby
is to collect all yr
and colors
df2 = (df
.groupBy("name", "make")
.agg(F.collect_list(F.struct("yr", "colour")).alias("yr_colour"))
)
# Output
# +-----+------+------------------------------+
# |name |make |yr_colour |
# +-----+------+------------------------------+
# |Peter|Toyota|[{2020, white}] |
# |Peter|BMW |[{2019, red}] |
# |Ben |BMW |[{2021, white}] |
# |John |BMW |[{2020, white}, {2009, black}]|
# |John |Ford |[{2021, red}] |
# +-----+------+------------------------------+
Looking at the data, it's sort of what we're expecting. Now all we have to do is run the second groupby
to collect all yr_colour
. You didn't tell the exact output format so I'm showing two options here
Option #1: making car
a MAP
(df2
.groupBy("name")
.agg(F.map_from_arrays(F.collect_list("make"), F.collect_list("yr_colour")).alias("car"))
)
# Output
# +-----+--------------------------------------------------------------+
# |name |car |
# +-----+--------------------------------------------------------------+
# |John |{BMW -> [{2020, white}, {2009, black}], Ford -> [{2021, red}]}|
# |Ben |{BMW -> [{2021, white}]} |
# |Peter|{Toyota -> [{2020, white}], BMW -> [{2019, red}]} |
# +-----+--------------------------------------------------------------+
# Schema
# root
# |-- name: string (nullable = true)
# |-- car: map (nullable = false)
# | |-- key: string
# | |-- value: array (valueContainsNull = false)
# | | |-- element: struct (containsNull = false)
# | | | |-- yr: string (nullable = true)
# | | | |-- colour: string (nullable = true)
Option #2: making car
a LIST
(df2
.groupBy("name")
.agg(F.collect_list(F.expr("map(make, yr_colour)")).alias("car"))
.printSchema()
)
# Output
# +-----+------------------------------------------------------------------+
# |name |car |
# +-----+------------------------------------------------------------------+
# |John |[{BMW -> [{2020, white}, {2009, black}]}, {Ford -> [{2021, red}]}]|
# |Ben |[{BMW -> [{2021, white}]}] |
# |Peter|[{Toyota -> [{2020, white}]}, {BMW -> [{2019, red}]}] |
# +-----+------------------------------------------------------------------+
# Schema
# root
# |-- name: string (nullable = true)
# |-- car: array (nullable = false)
# | |-- element: map (containsNull = false)
# | | |-- key: string
# | | |-- value: array (valueContainsNull = false)
# | | | |-- element: struct (containsNull = false)
# | | | | |-- yr: string (nullable = true)
# | | | | |-- colour: string (nullable = true)