pysparkapache-spark-sqldata-conversionaccelerator

Pyspark: Identify the arrayType column from the the Struct and call udf to convert array to string


I am creating an accelerator where it migrates the data from source to destination. For Example, I will pick the data from an API and will migrate the data to csv. I have faced issues with handling arraytype while data is converted to csv. I have used withColumn and concat_ws method(i.e., df1=df.withColumn('films',F.concat_ws(':',F.col("films"))) films is the arraytype column ) for this conversion and it worked. Now I wanted this to happen dynamically. I mean, without specifying the column name, is there a way that I can pick the column name from struct which have the arraytype and then call the udf?

Thank you for your time!


Solution

  • You can get the type of the columns using df.schema. Depending on the type of the column you can apply concat_ws or not:

    data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
    schema= ["col1", "col2", "arr1", "arr2"]
    df = spark.createDataFrame(data, schema)
    
    array_cols = [F.concat_ws(":", c.name).alias(c.name) \
        for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
    other_cols = [F.col(c.name) \
        for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]
    
    df = df.select(other_cols + array_cols)
    

    Result:

    +-----+-----+-----+-----+
    | col1| col2| arr1| arr2|
    +-----+-----+-----+-----+
    |test1|test2|1:2:3|a:b:c|
    +-----+-----+-----+-----+