I need to access the entire data of output_df in order to make comparisons with the input_df. I checked the transaction view of input_df and output_df, both of them are snapshot. Currently, output_df has 250 rows and I try to get entire data of output_df with "current", "added" and "previous" but all of them returns 0 rows in preview mode.
@configure(profile=['KUBERNETES_NO_EXECUTORS'])
@incremental(semantic_version=17)
@transform(
input_df=Input('ri.foundry.lava-catalog.dataset.c855c91b-3e73-4803-84bd-7d35c45f724c'),
output_df=Output('ri.foundry.lava-catalog.dataset.4802782e-4436-4bdf-87f3-5457245574c1')
)
def incremental_filter(input_df, output_df):
df_new = input_df.dataframe('added')
df_new = df_new.withColumn('Start', to_timestamp(col('Start'), 'yyyy-MM-dd HH:mm:ss'))
df_new = df_new.withColumn('End', to_timestamp(col('End'), 'yyyy-MM-dd HH:mm:ss'))
print("df_new columns are {}".format(df_new.columns))
print('----------------------------------')
# Load previous dataframe
print(output_df.dataframe('current', schema=schema).localCheckpoint().count())
df_previous = output_df.dataframe('current', schema=schema)
print("df_previous current count: ", df_previous.count()) #0 rows
df_previous = output_df.dataframe('previous', schema=schema)
print("df_previous previous count: ", df_previous.count()) #0 rows
df_previous = output_df.dataframe('added', schema=schema)
print("df_previous added count: ", df_previous.count()) #0 rows
#Doing some comparisons here
# ...................
# --------------------------
mode = 'replace'
output_df.set_mode(mode)
# Write the output dataframe
output_df.write_dataframe(df_union)
How can I get entire data of output_df in code repository?
Note that the Code Repositories preview feature will always run transforms in non-incremental mode. This is true even when require_incremental=True is passed into the incremental() decorator.
You should have success if you run a full build.