sqlpyspark

dynamically build where clause from pyspark dataframe


I have a pyspark dataframe which looks like this:

df_criterias = spark.createDataFrame(
    [
        ("IN ('ABC')", "IN ('XYZ')", "<2021", "", "Top"),
        ("IN ('ABC')", "NOT IN ('JKL','MNO')", "IN ('2021')", "", "Bottom"),
    ],
    ["CriteriaA", "CriteriaB", "CriteriaC", "CriteriaD", "Result"]
)
display(df_criterias)

enter image description here

I also have a dictionary that looks like this:

col_map = {
"CriteriaA" : "ColumnA",
"CriteriaB" : "ColumnB",
"CriteriaC" : "ColumnC",
"CriteriaD" : "ColumnD"
}

For each record of the pyspark dataframe above, I want to build a where clause statement, e.g. "ColumnA IN ('ABC') AND ColumnB ('XYZ') AND ColumnC < 2021"

Note that ColumnD is not included in the where clause statement above because CriteriaD has no value in the dataframe.

The idea is then to filter another dataframe using this where clause statement at the end.


Solution

  • You can leverage concat_ws to build your condition.

    First use concat_ws to concat the column name to criteria. Then, use concat_ws again to concat all criterion with AND.

    df = (df.withColumn('where', 
                        F.array([F.when(F.col(key) != '', F.concat_ws(' ', *[F.lit(value), F.col(key)])) 
                                 for key, value in col_map.items()]))
          .withColumn('where', F.concat_ws(' AND ', F.array_compact(F.col('where'))))
          .select('where'))