apache-sparkdictionarypysparkuser-defined-functionsspark2.4.4

UDFs with Dictionaries on Spark 2.4


I am using Pyspark 2.4.4., and I need to use a UDF to create my desired output. This UDF uses a broadcasted dictionary. First, it looks like I need to modify the code for the UDF to accept the dictionary. Second, I am not sure that what I am doing is the most efficient way to go in Spark 2.4. My code is as follows:

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()

#Broadcast the dictionary
pcDict = sc.broadcast(Dict)

## Function that calls the dictionary
def example(n):
    nodes = []
    children = [i[0] for i in pcD.value[n]]
    for child in children:
                    nodes.append(child)

    return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])

## Convert the Python function to UDF
schema = StructType([
    StructField("Out1", ArrayType(IntegerType()), False),
    StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])

example_udf = F.udf(example, schema)

# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])

### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")

My first question is regarding the dictionary. Should I use it or is there any other more efficient way? I was thinking for collectAsMap(), but given that it's available for rdds, I am not sure if this is the way to go in Spark 2.4.

The second question is that given that dictionary is the way to go, how should I modify the udf function?

Thanks in advance!


Solution

  • Regarding the first question I think that pandas offer an elegant way to convert your data into dictionary. Although since pandas will be executed in one node you may need to leverage the power of the cluster and therefore decide to go for a Spark version. One more factor, is the size of the dictionary itself. If you are sure that the dictionary can easily fit in one node you can safely keep the pandas version, otherwise try the next Spark code:

    from pyspark.sql import functions as F
    
    # This is a sample of the original Spark dataframe, which I will use to create the dictionary
    df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])
    
    df = df.rdd.map(lambda r: (r[1], (r[0], r[2]))) \
          .toDF(["preacher", "tuple"]) \
          .groupBy("preacher") \
          .agg(F.collect_list("tuple").alias("tuple"))
    
    dict = {}
    for k,v in df.rdd.collectAsMap().items():
      dict[k] = list(map(lambda row: (row[0], row[1]), v))
    
    dict
    # {3: [(2382556, '2012-11-23 22:03:42'), (7854140, '2012-11-28 22:03:42')],
    #  2: [(220, '2012-11-22 22:03:42')]}
    

    Also is good to mention that Spark will pack and send together with each task all the local variables used in the program. Therefore broadcast is suitable for large variables that should be stored on the executors in order to be easily accessible by any task.