pythonarraysdataframepysparkspark-csv

How to split an array structure to csv in PysPark


Here is an exemple data and schema :

mySchema = StructType([
   StructField('firstname', StringType()),
   StructField('lastname', StringType()),
   StructField('langages', ArrayType(StructType([
         StructField('lang1', StringType()),
         StructField('lang2', StringType())
 ])))
])

myData = [("john", "smith", [
            {'lang1': 'Java', 'lang2': 'Python'},
            {'lang1': 'C', 'lang2': 'R'},
            {'lang1': 'Perl', 'lang2': 'Scala'}
            ]),
          ("robert", "plant", [
            {'lang1': 'C', 'lang2': 'Java'},
            {'lang1': 'Python', 'lang2': 'Perl'}
            ])
          ]

Then creating the dataframe :

df = spark.createDataFrame(data=myData, schema=mySchema)

The schema looks like :

df.printSchema()
root
|-- firstname: string (nullable = true)
|-- lastname: string (nullable = true)
|-- langages: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- lang1: string (nullable = true)
|    |    |-- lang2: string (nullable = true)

and when show() :

df.show(df.count(), False)
+---------+--------+---------------------------------------+
|firstname|lastname|langages                               |
+---------+--------+---------------------------------------+
|john     |smith   |[[Java, Python], [C, R], [Perl, Scala]]|
|robert   |plant   |[[C, Java], [Python, Perl]]            |
+---------+--------+---------------------------------------+

At this point, all is allright. But now, I want to "flat" arrays, create a column for each "langages", join langage with "/" in order to export it as csv. It may looks like this :

firstname   lastname    langage_1    langage_2    langage_3 
john        smith       Java/Python  C/R          Perl/Scala
robert      plant       C/Java       Python/Perl

I tried to create 3 columns like this :

df.select([(col("langages")[x]).alias("langage_"+str(x+1)) for x in range(0, 3)]).show()
+--------------+--------------+-------------+
|     langage_1|     langage_2|    langage_3|
+--------------+--------------+-------------+
|[Java, Python]|        [C, R]|[Perl, Scala]|
|     [C, Java]|[Python, Perl]|         null|
+--------------+--------------+-------------+

My problem is that sometime langages array may have 2 or 3 or 4 or whatever elements.

So the range(0, 3) maybe range(0, 4) !

I must find the max element of arrays

And I don't know how to concatenate each array like [Java, Python] in order to have "Java/Python"

Thanx for your help


Solution

  • First we collect the max number of columns needed. Then create the columns using this value.

    n = df.select(F.max(F.size("langages")).alias("n_columns")).first().n_columns
    
    df.select(
        "firstname",
        "lastname",
        *[F.concat_ws("/", F.array(
            F.col("langages").getItem(i).getItem("lang1"),
            F.col("langages").getItem(i).getItem("lang2"),
        )).alias("langages") for i in range(n)]
    )