I am trying to find a way to dynamically convert the below dataframe using Spark scala
id | cnt_wk | wk1 | wk2 | wk3 | wk4 |
---|---|---|---|---|---|
123 | 1 | 1 | 0 | 0 | 0 |
124 | 2 | 1 | 1 | 0 | 0 |
223 | 3 | 1 | 1 | 1 | 0 |
225 | 4 | 1 | 1 | 1 | 1 |
to this
id | cnt_wk | num_wks |
---|---|---|
123 | 1 | 1 |
124 | 2 | 1 |
223 | 3 | 1 |
225 | 4 | 1 |
124 | 2 | 2 |
223 | 3 | 2 |
225 | 4 | 2 |
223 | 3 | 3 |
224 | 4 | 3 |
224 | 4 | 4 |
Is there a better way to do this, instead of manually filtering the rows and doing an union all. Also columns wk1, wk2 ... wkn changes dynamically based on the max(cnt_wk). Source Data volume is > 200mil
Use stack
to unpivot the table first, and filter the matched week data.
data = [
[123,1,1,0,0,0],
[124,2,1,1,0,0],
[223,3,1,1,1,0],
[225,4,1,1,1,1]
]
cols = ['id','cnt_wk','wk1','wk2','wk3','wk4']
stack_cols = cols[2:]
stack_expr = ', '.join([f"'{c}', {c}" for c in stack_cols])
stack_size = len(stack_cols) + 1
df = spark.createDataFrame(data, cols)
df.selectExpr('id', 'cnt_wk', f'stack({stack_size}, {stack_expr}) as (col_wk, num_wks)') \
.filter('substr(col_wk, 3, 1) = cnt_wk') \
.drop('col_wk') \
.show()
+---+------+-------+
| id|cnt_wk|num_wks|
+---+------+-------+
|123| 1| 1|
|124| 2| 1|
|223| 3| 1|
|225| 4| 1|
+---+------+-------+