apache-sparkpysparkaccumulator

Using Accumulator inside Pyspark UDF


I want to access accumulator inside pyspark udf :

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType




accum=spark.sparkContext.accumulator(0)



def prob(g,s):
    if g=='M':
        accum.add(1)
        return 1
    else:
        accum.add(2)
        return accum.value 
    
convertUDF = udf(lambda g,s : prob(g,s),IntegerType())

problem i am getting :

  raise Exception("Accumulator.value cannot be accessed inside tasks")
Exception: Accumulator.value cannot be accessed inside tasks

Please let me know how to access accumulator value and how can we change it inside Pyspark UDF .


Solution

  • You cannot access the .value of the accumulator in the udf. From the documentation (see this answer too):

    Worker tasks on a Spark cluster can add values to an Accumulator with the += operator, but only the driver program is allowed to access its value, using value.

    It is unclear why you need to return accum.value in this case. I believe you only need to return 2 in the else block looking at your if block:

    def prob(g,s):
        if g=='M':
            accum.add(1)
            return 1
        else:
            accum.add(2)
            return 2