pysparkpalantir-foundryfoundry-code-repositories

In Foundry, how to read "added" rows in output since the last built?


My output dataframe has snapshot transaction view. I try to read only "added" rows to output_df since last run but I get zero rows. Is it possible to access newly "added" rows to output_df if it has snapshot view? If not, any pipeline logic suggestion to access those rows?

I want to check if any rows that exist in both input and output dataframes also exist in the newly added data to output_df since the last run.

Transaction view of output_df:

enter image description here

from transforms.api import transform, incremental, Input, Output, configure
from pyspark.sql import types as T
from pyspark.sql import functions as F

schema = T.StructType(
    [
        T.StructField("wellname", T.StringType()),
        T.StructField("start_date", T.TimestampType()),
        T.StructField("woe_limit_psia", T.DoubleType()),
        T.StructField("end_date", T.TimestampType()),
    ]
)


@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental(
    # require_incremental=True, snapshot_inputs=["input_df"], semantic_version=13
    snapshot_inputs=["input_df", "input_df2"], require_incremental=True
)
@transform(
    input_df=Input(
        "ri.foundry.lava-catalog.dataset.974e09fa-fba6-4867-a7b1-f860ab7a7046"
    ),
    output_df=Output(
        "ri.foundry.lava-catalog.dataset.c3407f67-2798-4049-8455-586a025a0b65"
    )
)

def incremental_filter(input_df, output_df):
    df_current = input_df.dataframe("added")
    df_history = output_df.dataframe('previous', schema=schema)
    new_output = output_df.dataframe('added', schema=schema)
    
    # I want to check here if any rows exist in input and output dataframes also exist in the newly added data
    # to output_df since the last run.

    previous_rows = df_current.join(df_history, on=["wellname", "woe_limit_psia"], how="inner")
    repeated_rows = previous_rows.join(new_output, on=['wellname', 'woe_limit_psia'], how='left_anti')
    # repeated_rows has all data of previous_rows, I think it is because I get 0 rows in new_output.

Solution

  • When an input is snapshoting, all rows of the current View (what you can see in the dataset) are "added rows", given they were all rewritten by the latest snapshot.

    You can't then differentiate "new rows" from "old rows" given they are all rewritten together.

    You can't as well look at the previous state (the N-1 snapshot).

    The only options I see: