I have a pipeline setup in my Foundry instance that is using incremental computation but for some reason isn't doing what I expect. Namely, I want to read the previous output of my transform and get the maximum value of a date, then read the input only for data immediately after this maximum date.
For some reason, it isn't doing what I expect and it's quite frustrating to step through the code on a build / analyze / modify code process.
My code looks like the following:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = my_output.dataframe("previous", output_schema)
current_input_df = my_input.dataframe("current")
# Get the next date of interest from the previous output
previous_max_date_rows = previous_output_df.groupBy().agg(
F.max(F.col("date")).alias("max_date")
).collect() # noqa
# PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
# operation in favor of making a new query plan.
if len(previous_max_date_rows) == 0:
# We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
previous_max_date = START_DATE
else:
# We have a previous max date, use it.
previous_max_date = previous_max_date_rows[0][0]
delta = timedelta(days=JUMP_DAYS)
next_date = previous_max_date + delta
# Filter the input to only the next date
filtered_input = current_input_df.filter(F.col("date") == F.lit(date))
# Do any other processing...
output_df = filtered_input
# Persist
my_output.set_mode("modify")
my_output.write_dataframe(output_df)
In incremental transforms, it can be difficult to isolate what conditions exist that are breaking your code. As such, it's typically best to:
In your code example, breaking up the execution to a bunch of testable methods will make it substantially easier to test it and see what's wrong.
The new method should look something like this:
from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta
JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
T.StructField("date", T.DateType()),
T.StructField("value", T.IntegerType())
])
def get_previous_max_date(previous_output_df) -> date:
"""Given the previous output, get the maximum date written to it"""
previous_max_date_rows = previous_output_df.groupBy().agg(
F.max(F.col("date")).alias("max_date")
).collect() # noqa
# PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
# operation in favor of making a new query plan.
if len(previous_max_date_rows) == 0:
# We are running for the first time or re-snapshotting. There's no previous date. Use fallback.
previous_max_date = START_DATE
else:
# We have a previous max date, use it.
previous_max_date = previous_max_date_rows[0][0]
return previous_max_date
def get_next_date(previous_output_df) -> date:
"""Given the previous output, compute the max date + 1 day"""
previous_max_date = get_previous_max_date(previous_output_df)
delta = timedelta(days=JUMP_DAYS)
next_date = previous_max_date + delta
return next_date
def filter_input_to_date(current_input_df: DataFrame, date_filter: date) -> DataFrame:
"""Given the entire intput, filter to only rows that have the next date"""
return current_input_df.filter(F.col("date") == F.lit(date))
def process_with_dfs(current_input_df, previous_output_df) -> DataFrame:
"""With the constructed DataFrames, do our work"""
# Get the next date of interest from the previous output
next_date = get_next_date(previous_output_df)
# Filter the input to only the next date
filtered_input = filter_input_to_date(current_input_df, next_date)
# Do any other processing...
return filtered_input
@incremental(semantic_version=1)
@transform(
my_input=Input("/path/to/my/input"),
my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
"""Filter the input to only rows that are a day after the last written output and process them"""
# Get the previous output and full current input
previous_output_df = my_output.dataframe("previous", output_schema)
current_input_df = my_input.dataframe("current")
# Do the processing
output_df = process_with_dfs(current_input_df, previous_output_df)
# Persist
my_output.set_mode("modify")
my_output.write_dataframe(output_df)
You can now set up individual unit tests, assuming your code lives at transforms-python/src/myproject/datasets/output.py
, following the methodology here to set everything up correctly.
Therefore my testing file now looks like the following:
from pyspark.sql import functions as F, types as T
from myproject.datasets import (
only_write_one_day,
process_with_dfs,
filter_input_to_date,
get_next_date,
get_previous_max_date,
OUTPUT_SCHEMA,
JUMP_DAYS,
START_DATE
)
import pytest
from datetime import date
@pytest.fixture
def empty_output_df(spark_session):
data = []
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
@pytest.fixture
def single_write_output_df(spark_session):
data = [{
"date": date(year=2021, month=10, day=1),
"value": 1
}]
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
@pytest.fixture
def double_write_output_df(spark_session):
data = [
{
"date": date(year=2021, month=10, day=1),
"value": 1
},
{
"date": date(year=2021, month=10, day=2),
"value": 2
}
]
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
@pytest.fixture
def normal_input_df(spark_session):
data = [
{
"date": date(year=2021, month=10, day=1),
"value": 1
},
{
"date": date(year=2021, month=10, day=2),
"value": 2
}
]
return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
# ======= FIRST RUN CASE
def test_first_run_process_with_dfs(normal_input_df, empty_output_df):
assert True
def test_first_run_filter_input_to_date(normal_input_df, empty_output_df):
assert True
def test_first_run_get_next_date(normal_input_df, empty_output_df):
assert True
def test_first_run_get_previous_max_date(normal_input_df, empty_output_df):
assert True
# ======= NORMAL CASE
def test_normal_run_process_with_dfs(normal_input_df, single_write_output_df):
assert True
def test_normal_run_filter_input_to_date(normal_input_df, single_write_output_df):
assert True
def test_normal_run_get_next_date(normal_input_df, single_write_output_df):
assert True
def test_normal_run_get_previous_max_date(normal_input_df, single_write_output_df):
assert True
It's worth noting this is why you can enable things like McCabe complexity checkers and unit test coverage features inside Foundry so you can break up your code into smaller more durable pieces like this.
Following a design pattern like this will give you much more durable code that is more trustworthy in incremental transforms.
If you adopt this style of transform, you will also be able to iterate much faster on perfecting your logic by running the individual test you are looking for using the Code Repository feature of "Test". You can open the test file and click the green "Test" button next to the specific case you are interested in, which will let you get your logic written much faster than clicking build every time and trying to get your input conditions lined up like you want.