I am creating an accelerator where it migrates the data from source to destination. For Example, I will pick the data from an API and will migrate the data to csv. I have faced issues with handling arraytype while data is converted to csv. I have used withColumn and concat_ws method(i.e., df1=df.withColumn('films',F.concat_ws(':',F.col("films"))) films is the arraytype column ) for this conversion and it worked. Now I wanted this to happen dynamically. I mean, without specifying the column name, is there a way that I can pick the column name from struct which have the arraytype and then call the udf?
Thank you for your time!
You can get the type of the columns using df.schema. Depending on the type of the column you can apply concat_ws or not:
data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
schema= ["col1", "col2", "arr1", "arr2"]
df = spark.createDataFrame(data, schema)
array_cols = [F.concat_ws(":", c.name).alias(c.name) \
for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
other_cols = [F.col(c.name) \
for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]
df = df.select(other_cols + array_cols)
Result:
+-----+-----+-----+-----+
| col1| col2| arr1| arr2|
+-----+-----+-----+-----+
|test1|test2|1:2:3|a:b:c|
+-----+-----+-----+-----+