pythonpysparkpalantir-foundryincremental-build

Change inputs based on whether a transform can run incrementally or not


I am fairly new at writing code and trying to teach myself python and pyspark based on searching the web for answers to my problems. I am trying to build a historical record set based on daily changes. I periodically have to bump the semantic version, but do not want to lose my already collected historical data. If the job can run incrementally then it performs the incremental transform like normal. Any and all help is appreciated.

SEMANTIC_VERSION = 1

# if job cannot run incrementally
# joins current snapshot data with already collected historical data
if cannot_not_run_incrementally:
    @transform(
        history=Output(historical_output),
        backup=Input(historical_output_backup),
        source=Input(order_input),
    )
    def my_compute_function(source, history, backup, ctx):
        input_df = (
            source.dataframe()
            .withColumn('record_date', F.current_date())
        )
        old_df = backup.dataframe()
        joined = old_df.unionByName(input_df)
        joined = joined.distinct()
        history.write_dataframe(joined)


# if job can run incrementally perform incremental transform normally
else:
    @incremental(snapshot_inputs=['source'], semantic_version=SEMANTIC_VERSION)
    @transform(
        history=Output(historical_output),
        backup=Output(historical_output_backup),
        source=Input(order_input),
    )
    def my_compute_function(source, history, backup):
        input_df = (
            source.dataframe()
            .withColumn('record_date', F.current_date())
        )
        history.write_dataframe(input_df.distinct()
                                .subtract(history.dataframe('previous', schema=input_df.schema)))
        backup.set_mode("replace")
        backup.write_dataframe(history.dataframe())

working code based on information from the selected answer and comments.

SEMANTIC_VERSION = 3


@incremental(snapshot_inputs=['source'], semantic_version=SEMANTIC_VERSION)
@transform(
    history=Output(),
    backup=Output(),
    source=Input(),
)
def compute(ctx, history, backup, source):
    # running incrementally
    if ctx.is_incremental:
        input_df = (
            source.dataframe()
            .withColumn('record_date', F.current_date())
            )
        history.write_dataframe(input_df.subtract(history.dataframe('previous', schema=input_df.schema)))
        backup.set_mode("replace")
        backup.write_dataframe(history.dataframe().distinct())

    # not running incrementally
    else:
        input_df = (
            source.dataframe()
            .withColumn('record_date', F.current_date())
        )
        backup.set_mode('modify')  # use replace if  you want to start fresh
        backup.write_dataframe(input_df)
        history.set_mode('replace')
        history.write_dataframe(backup.dataframe().distinct())

Solution

  • You use the 'IncrementalTransformContext' of the transform to determine whether it is running incrementally.

    This can be seen in the code below.

    @incremental()
    @transform(
        x=Output(),
        y=Input(),
        z=Input(),
    )
    def compute(ctx, x, y, z):
        if ctx.is_incremental:
            ## Some Code
        else:
            ## Other Code
    

    More information on IncrementalTransformContext can be found here on your environment ({URL}/workspace/documentation/product/transforms/python-transforms-api-incrementaltransformcontext) or here (https://www.palantir.com/docs/foundry/transforms-python/transforms-python-api-classes/#incrementaltransformcontext)