I have a pyspark dataframe which looks like this:
df_criterias = spark.createDataFrame(
[
("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
],
["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
display(df_criterias)
I also have a dictionary that looks like this:
col_map = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}
For each record of the pyspark dataframe above, I want to build a where clause statement, e.g. "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021"
Note that ColumnD is not included in the where clause statement above because CriteriaD has no value in the dataframe.
The idea is then to filter another dataframe using this where clause statement at the end.
You can leverage concat_ws
to build your condition.
First use concat_ws
to concat the column name to criteria. Then, use concat_ws
again to concat all criterion with AND.
df = (df.withColumn('where',
F.array([F.when(F.col(key) != '', F.concat_ws(' ', *[F.lit(value), F.col(key)]))
for key, value in col_map.items()]))
.withColumn('where', F.concat_ws(' AND ', F.array_compact(F.col('where'))))
.select('where'))