I do not know if the title of question is appropriate but basically I want to read whole output dataset.
I uploaded the output dataset as a parquet file into Foundry (which is "Append" transaction). My input data is calculated from python transform in code workbook (which is showing as SNAPSHOT in transaction details).
Now, I want to read that uploaded output dataset and do some comparison with input_df and replace that whole output dataset with new output data after comparisons with input_df. My output dataset has 123 rows and input dataset has 86 rows. As a testing purpose, I tried to read the "previous" output_df and union it with input_df and built the code repo, I got 86 rows in output_df (which is exactly the same data of input_df). I think I read the output_df as a empty dataset. How can I read entire data (123 rows) of output_df ?
from transforms.api import transform, incremental, Input, Output, configure
from pyspark.sql import types as T
from pyspark.sql import functions as F
schema = T.StructType(
[
T.StructField("wellname", T.StringType()),
T.StructField("start_date", T.TimestampType()),
T.StructField("woe_limit_psia", T.DoubleType()),
T.StructField("end_date", T.TimestampType()),
]
)
@configure(profile=["KUBERNETES_NO_EXECUTORS"])
@incremental(
snapshot_inputs=["input_df"]
)
@transform(
input_df=Input(
"ri.foundry.lava-catalog.dataset.974e09fa-fba6-4867-a7b1-f860ab7a7046"
),
output_df=Output(
"ri.foundry.lava-catalog.dataset.95804158-098a-4f0c-afea-64abd10fbc1f"
)
)
def incremental_filter(input_df, output_df):
df_current = input_df.dataframe("added")
columns_in_order = [field.name for field in schema.fields]
df_current = df_current.select(columns_in_order)
df_union = df_current.unionByName(output_df.dataframe(mode="previous", schema=schema))
mode = "replace"
df_union.localCheckpoint(eager=True)
output_df.set_mode(mode)
# Write the output dataframe
output_df.write_dataframe(df_union)
output_df.dataframe(mode="current", schema=schema)
should work.
Even not specifying a mode should work, as it will simply "just pick what is on the output".
More information there: https://www.palantir.com/docs/foundry/transforms-python/incremental-reference/#incrementaltransforminput