I have a dataframe(say, ac_df) which have 32 different columns. I want to get a specific column and split the values into a chunk of 3 as a single new value and create another df out of it.
ac_df['payment_history_1']
gives below result
I want a new df which is of below structure.
For ex: If I take the first row, '000000000000', it is grouped as
'000', '000', '000', '000'
and this creates the first row of new df.
Python equivalent code to perform this task is as below:
temp1 = ac_df['payment_history_1'].str.split(r'(...)', expand=True)
In spark, I tried something below:
temp1 = ac_df.select(ac_df['payment_history_1']).rdd.map(lambda each_row: str(each_row[0])).map(lambda y: y.split(r'(...)')).collect()
Output:
[['000000000000'], ['000000000003000000000'], ['000000000003000000000000000']]
However, I'm unable to move ahead and get the desired result. Can someone suggest?
Try this, you will be able to build over this :
df = spark.createDataFrame(
[
[1, '000000000000'],
[2, '000000000003000000000'],
[3, '000000000003000000000000000']
]
, ["id", "numbers"]
)
df.show()
Should yield something similar to the dataframe you start with :
+---+--------------------+
| id| numbers|
+---+--------------------+
| 1| 000000000000|
| 2|00000000000300000...|
| 3|00000000000300000...|
+---+--------------------+
taking the numbers column you will be able to parse it into a "," separated string from where we can apply the: posexplode(expr) - Separates the elements of array expr into multiple rows with positions, or the elements of map expr into multiple rows and columns with positions.
from pyspark.sql.functions import posexplode
df.select(
"id",
f.split("numbers", ",").alias("numbers"),
f.posexplode(f.split("numbers", ",")).alias("pos", "val")
).show()
which should result in :
+---+--------------------+---+---+
| id| numbers|pos|val|
+---+--------------------+---+---+
| 1|[000, 000, 000, 000]| 0|000|
| 1|[000, 000, 000, 000]| 1|000|
| 1|[000, 000, 000, 000]| 2|000|
| 1|[000, 000, 000, 000]| 3|000|
| 2|[000, 000, 000, 0...| 0|000|
| 2|[000, 000, 000, 0...| 1|000|
| 2|[000, 000, 000, 0...| 2|000|
| 2|[000, 000, 000, 0...| 3|003|
| 2|[000, 000, 000, 0...| 4|000|
| 2|[000, 000, 000, 0...| 5|000|
| 2|[000, 000, 000, 0...| 6|000|
| 3|[000, 000, 000, 0...| 0|000|
| 3|[000, 000, 000, 0...| 1|000|
| 3|[000, 000, 000, 0...| 2|000|
| 3|[000, 000, 000, 0...| 3|003|
| 3|[000, 000, 000, 0...| 4|000|
| 3|[000, 000, 000, 0...| 5|000|
| 3|[000, 000, 000, 0...| 6|000|
| 3|[000, 000, 000, 0...| 7|000|
| 3|[000, 000, 000, 0...| 8|000|
+---+--------------------+---+---+
Next, we use : pyspark.sql.functions.expr to grab the element at index pos in this array.
The first one is the name of our new column, which will be a concatenation of number and the index in the array. The second column will be the value at the corresponding index in the array. We get the latter by exploiting the functionality of pyspark.sql.functions.expr which allows us to use column values as parameters.
df.select(
"id",
f.split("numbers", ",").alias("numbers"),
f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
"id",
f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
f.expr("numbers[pos]").alias("val")
)\
.show()
which results:
+---+--------+---+
| id| number|val|
+---+--------+---+
| 1|numbers0|000|
| 1|numbers1|000|
| 1|numbers2|000|
| 1|numbers3|000|
| 2|numbers0|000|
| 2|numbers1|000|
| 2|numbers2|000|
| 2|numbers3|003|
| 2|numbers4|000|
| 2|numbers5|000|
| 2|numbers6|000|
| 3|numbers0|000|
| 3|numbers1|000|
| 3|numbers2|000|
| 3|numbers3|003|
| 3|numbers4|000|
| 3|numbers5|000|
| 3|numbers6|000|
| 3|numbers7|000|
| 3|numbers8|000|
+---+--------+---+
Finally we can just groupBy the id and pivot the DataFrame
df.select(
"id",
f.split("numbers", ",").alias("numbers"),
f.posexplode(f.split("numbers", ",")).alias("pos", "val")
)\
.drop("val")\
.select(
"id",
f.concat(f.lit("numbers"),f.col("pos").cast("string")).alias("number"),
f.expr("numbers[pos]").alias("val")
)\
.groupBy("id").pivot("number").agg(f.first("val"))\
.show()
giving the final dataframe :
picked up the details from : Split Spark Dataframe string column into multiple columns