apache-sparkpysparkapache-spark-sqlpalantir-foundryaccumulator

Access accumulator value after using it in user defined function within df.widthColumn in Palantir Foundry


I am trying to use a customized accumulator within Palantir Foundry to aggregate Data within a user defined function which is applied to each row of a dataframe within a statement df.withColumn(...).

From the resulting dataframe, I see, that the incrementation of the accumulator-value happens as expected. However, the value of the accumulator variable itself in the script does not change during the execution. I see, that the Python-ID of the accumulator variable in the script differs from the Python-ID of the accumulator within the user defined function. But that might be expected...

How do I access the accumulator value which incrementation can be watched in the resulting dataframe-colun from within the calling script after the execution, as this is the information I am looking for?

from transforms.api import transform_df, Input, Output
import numpy as np
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, struct

global accum

@transform_df(
    Output("ri.foundry.main.dataset.xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"),
)
def compute(ctx):

    from pyspark.sql.types import StructType, StringType, IntegerType,  StructField

    data2 = [("James","","Smith","36636","M",3000),
        ("Michael","Rose","","40288","M",4000),
        ("Robert","","Williams","42114","M",4000),
        ("Maria","Anne","Jones","39192","F",4000),
        ("Jen","Mary","Brown","","F",-1)
    ]

    schema = StructType([ \
        StructField("firstname",StringType(),True), \
        StructField("middlename",StringType(),True), \
        StructField("lastname",StringType(),True), \
        StructField("id", StringType(), True), \
        StructField("gender", StringType(), True), \
        StructField("salary", IntegerType(), True) \
    ])

    df = ctx.spark_session.createDataFrame(data=data2, schema=schema)

    ####################################

    class AccumulatorNumpyArray(AccumulatorParam):
        def zero(self, zero: np.ndarray):
            return zero

        def addInPlace(self, v1, v2):
            return v1 + v2

    # from pyspark.context import SparkContext
    # sc = SparkContext.getOrCreate()
    sc = ctx.spark_session.sparkContext

    shape = 3

    global accum
    accum = sc.accumulator(
            np.zeros(shape, dtype=np.int64),
            AccumulatorNumpyArray(),
            )

    def func(row):
        global accum
        accum += np.ones(shape)
        return str(accum) + '_' + str(id(accum))

    user_defined_function = udf(func, StringType())

    new = df.withColumn("processed", user_defined_function(struct([df[col] for col in df.columns])))
    new.show(2)

    print(accum)

    return df

results in

 +---------+----------+--------+-----+------+------+--------------------+
|firstname|middlename|lastname|   id|gender|salary|           processed|
+---------+----------+--------+-----+------+------+--------------------+
|    James|          |   Smith|36636|     M|  3000|[1. 1. 1.]_140388...|
|  Michael|      Rose|        |40288|     M|  4000|[2. 2. 2.]_140388...|
+---------+----------+--------+-----+------+------+--------------------+
only showing top 2 rows

and

> accum
 Accumulator<id=0, value=[0 0 0]>
> id(accum)
 140574405092256

If the Foundry-Boiler-Plate is removed, resulting in

import numpy as np
from pyspark.accumulators import AccumulatorParam
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

spark = (
    SparkSession.builder.appName("Python Spark SQL basic example")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
# ctx = spark.sparkContext.getOrCreate()

data2 = [
    ("James", "", "Smith", "36636", "M", 3000),
    ("Michael", "Rose", "", "40288", "M", 4000),
    ("Robert", "", "Williams", "42114", "M", 4000),
    ("Maria", "Anne", "Jones", "39192", "F", 4000),
    ("Jen", "Mary", "Brown", "", "F", -1),
]

schema = StructType(
    [
        StructField("firstname", StringType(), True),
        StructField("middlename", StringType(), True),
        StructField("lastname", StringType(), True),
        StructField("id", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("salary", IntegerType(), True),
    ]
)

# df = ctx.spark_session.createDataFrame(data=data2, schema=schema)
df = spark.createDataFrame(data=data2, schema=schema)

####################################


class AccumulatorNumpyArray(AccumulatorParam):
    def zero(self, zero: np.ndarray):
        return zero

    def addInPlace(self, v1, v2):
        return v1 + v2


sc = SparkContext.getOrCreate()

shape = 3

global accum
accum = sc.accumulator(
    np.zeros(shape, dtype=np.int64),
    AccumulatorNumpyArray(),
)


def func(row):
    global accum
    accum += np.ones(shape)
    return str(accum) + "_" + str(id(accum))


user_defined_function = udf(func, StringType())

new = df.withColumn(
    "processed", user_defined_function(struct([df[col] for col in df.columns]))
)
new.show(2, False)

print(id(accum))
print(accum)

the output obtained within a regular Python environment with pyspark version 3.3.1 on Ubuntu meets the expectations and is

+---------+----------+--------+-----+------+------+--------------------------+
|firstname|middlename|lastname|id   |gender|salary|processed                 |
+---------+----------+--------+-----+------+------+--------------------------+
|James    |          |Smith   |36636|M     |3000  |[1. 1. 1.]_139642682452576|
|Michael  |Rose      |        |40288|M     |4000  |[1. 1. 1.]_139642682450224|
+---------+----------+--------+-----+------+------+--------------------------+
only showing top 2 rows

140166944013424
[3. 3. 3.]


Solution

  • The code that runs outside of the transform is ran in a different environment than the code within your transform. When you commit, you'll be running your checks which runs the code outside the transform to generate the jobspec which is technically your executable transform. You can find these within the "details" of your dataset after the checks pass.

    The logic within your transform is then detached and runs in isolation each time you hit build. The global accum you define outside the transform is never ran and doesn't exist when the code inside the compute is running.

    global accum <-- runs in checks
    
    @transform_df(
        Output("ri.foundry.main.dataset.c0d4fc0c-bb1d-4c7b-86ce-a13ec6666490"),
    )
    def compute(ctx):
        bla bla some logic <-- runs during build
    

    The prints you are doing during your second code example, happen after the df is processed, because you are asking spark to compute with the new.show(2, false). While the print you are doing in the first example happen before the df is processed, since the compute will only happen after your return df.

    If you want to try to print after your df is computed, you can use @transform(... instead of @transform_df(... and do a print after writing the dataframe contents. Should be something like this:

    @transform(
        output=Output("ri.foundry.main.dataset.c0d4fc0c-bb1d-4c7b-86ce-a13ec6666490"),
    )
    def compute(ctx, output):
        df = ... some logic ...
    
        output.write_dataframe(df) # please check the function name I think it was write_dataframe, but may be wrong
        print accum