I want to create an incremental pipeline where I can get the previous input’s dataframe as an output after each build/update of the input dataframe. My use case is to detect any changes (removed/added rows) in the input dataframe; therefore I need to compare current input with the previous input. How can I do this?
To illustrate an example of your desired outcome:
First dataframe: | ID | Data| | - | --- | | 1 | A | | 2 | B |
Second transaction: append C in the input dataframe Current input becomes: | ID | Data| | - | --- | | 1 | A | | 2 | B | | 3 | C | Expected output (previous input): | ID | Data| | - | --- | | 1 | A | | 2 | B | Third transaction: delete C, append D in the input dataframe Current input becomes: | ID | Data| | - | --- | | 1 | A | | 2 | B | | 3 | D | Expected output (previous input): | ID | Data| | - | --- | | 1 | A | | 2 | B | | 3 | C |
To do this, you can use a transform with two outputs, the first of which will be used to save the previous version of the input dataset, and the second would be the difference between the current input and output 1. See some example code below:
@incremental()
@transform(
output_1=Output("..."),
output_new=Output("..."),
df=Input("..."),
)
def compute_add(df, output_new, output_1):
df_inp = df.dataframe('current')
df_previous = output_1.dataframe('previous')
df_changed = df_inp.subtract(df_previous)
output_new.set_mode('replace')
output_1.set_mode('replace')
return output_new.write_dataframe(df_changed), output_1.write_dataframe(df_inp)