I have this dataframe in Pyspark:
data = [("definitely somewhere",), ("Las Vegas",), ("其他",), (None,), ("",), ("Pucela Madrid Langreo, España",), ("Trenches, With Egbon Adugbo",)]
df = spark.createDataFrame(data, ["address"])
city_country = {
'las vegas': 'US',
'lagos': 'NG',
'España': 'ES'
}
cities_name_to_code = spark.sparkContext.broadcast(city_country )
df_with_codes = df.withColumn('cities_array', F.lower(F.col('address'))) \
.withColumn('cities_array', F.split(F.col('cities_array'), ', '))
I want to find in cities_array
all the keys from cities_name_to_code
for each element (get an array of values).
The problem is that I don't want to use UDF.
you can use transform
higher order function for this use case and pass a case when as the function inside it.
here's an example
# create case when builder function
case_whens = lambda c: reduce(lambda x, y: x.when(c == y[0].lower(), y[1]), city_country.items(), func)
# test case when builder
# case_whens(func.lit('bork'))
# Column<'CASE WHEN (bork = las vegas) THEN US WHEN (bork = lagos) THEN NG WHEN (bork = españa) THEN ES END'>
# use case when inside the `transform`
df_with_codes_sdf = data_sdf. \
withColumn('cities_array', func.lower(func.col('address'))). \
withColumn('cities_array', func.split(func.col('cities_array'), ', ')). \
withColumn('city_codes_array', func.transform('cities_array', lambda a: case_whens(a))). \
show(truncate=False)
# +-----------------------------+-------------------------------+----------------+
# |address |cities_array |city_codes_array|
# +-----------------------------+-------------------------------+----------------+
# |definitely somewhere |[definitely somewhere] |[null] |
# |Las Vegas |[las vegas] |[US] |
# |其他 |[其他] |[null] |
# |null |null |null |
# | |[] |[null] |
# |Pucela Madrid Langreo, España|[pucela madrid langreo, españa]|[null, ES] |
# |Trenches, With Egbon Adugbo |[trenches, with egbon adugbo] |[null, null] |
# +-----------------------------+-------------------------------+----------------+