palantir-foundryfoundry-code-repositoriesfoundry-python-transform

Palantir Foundry incremental testing is hard to iterate on, how do I find bugs faster?


I have a pipeline setup in my Foundry instance that is using incremental computation but for some reason isn't doing what I expect. Namely, I want to read the previous output of my transform and get the maximum value of a date, then read the input only for data immediately after this maximum date.

For some reason, it isn't doing what I expect and it's quite frustrating to step through the code on a build / analyze / modify code process.

My code looks like the following:

from pypsark.sql import functions as F, types as T, DataFrame
from transforms.api import transform, Input, Output, incremental
from datetime import date, timedelta


JUMP_DAYS = 1
START_DATE = date(year=2021, month=10, day=1)
OUTPUT_SCHEMA = T.StructType([
  T.StructField("date", T.DateType()),
  T.StructField("value", T.IntegerType())
])


@incremental(semantic_version=1)
@transform(
    my_input=Input("/path/to/my/input"),
    my_output=Output("/path/to/my/output")
)
def only_write_one_day(my_input, my_output):
  """Filter the input to only rows that are a day after the last written output and process them"""

  # Get the previous output and full current input
  previous_output_df = my_output.dataframe("previous", output_schema)
  current_input_df = my_input.dataframe("current")

  # Get the next date of interest from the previous output 
  previous_max_date_rows = previous_output_df.groupBy().agg(
      F.max(F.col("date")).alias("max_date")
  ).collect() # noqa
  # PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
  #   operation in favor of making a new query plan. 

  if len(previous_max_date_rows) == 0:
    # We are running for the first time or re-snapshotting.  There's no previous date.  Use fallback.  
    previous_max_date = START_DATE
  else:
    # We have a previous max date, use it. 
    previous_max_date = previous_max_date_rows[0][0]

  delta = timedelta(days=JUMP_DAYS)
  next_date = previous_max_date + delta

  # Filter the input to only the next date
  filtered_input = current_input_df.filter(F.col("date") == F.lit(date))

  # Do any other processing...

  output_df = filtered_input

  # Persist 
  my_output.set_mode("modify")
  my_output.write_dataframe(output_df)

Solution

  • In incremental transforms, it can be difficult to isolate what conditions exist that are breaking your code. As such, it's typically best to:

    1. Make your compute function do nothing besides fetch the appropriate views of your inputs / outputs and pass these DataFrames off to interior methods
    2. Modularize every piece of your logic to make it testable
    3. Write tests for each piece that validate each manipulation of a specific DataFrame does what you expect.

    In your code example, breaking up the execution to a bunch of testable methods will make it substantially easier to test it and see what's wrong.

    The new method should look something like this:

    from pypsark.sql import functions as F, types as T, DataFrame
    from transforms.api import transform, Input, Output, incremental
    from datetime import date, timedelta
    
    
    JUMP_DAYS = 1
    START_DATE = date(year=2021, month=10, day=1)
    OUTPUT_SCHEMA = T.StructType([
      T.StructField("date", T.DateType()),
      T.StructField("value", T.IntegerType())
    ])
    
    
    def get_previous_max_date(previous_output_df) -> date:
      """Given the previous output, get the maximum date written to it"""
      previous_max_date_rows = previous_output_df.groupBy().agg(
          F.max(F.col("date")).alias("max_date")
      ).collect() # noqa
      # PERFORMANCE NOTE: It is acceptable to collect the max value here to avoid cross-join-filter expensive
      #   operation in favor of making a new query plan. 
    
      if len(previous_max_date_rows) == 0:
        # We are running for the first time or re-snapshotting.  There's no previous date.  Use fallback.  
        previous_max_date = START_DATE
      else:
        # We have a previous max date, use it. 
        previous_max_date = previous_max_date_rows[0][0]
      return previous_max_date
    
    
    def get_next_date(previous_output_df) -> date:
      """Given the previous output, compute the max date + 1 day"""
      previous_max_date = get_previous_max_date(previous_output_df)
      delta = timedelta(days=JUMP_DAYS)
      next_date = previous_max_date + delta
      return next_date
    
    
    def filter_input_to_date(current_input_df: DataFrame, date_filter: date) -> DataFrame:
      """Given the entire intput, filter to only rows that have the next date"""
      return current_input_df.filter(F.col("date") == F.lit(date))
    
    
    def process_with_dfs(current_input_df, previous_output_df) -> DataFrame:
      """With the constructed DataFrames, do our work"""
      # Get the next date of interest from the previous output 
      next_date = get_next_date(previous_output_df)
    
      # Filter the input to only the next date
      filtered_input = filter_input_to_date(current_input_df, next_date)
    
      # Do any other processing...
    
      return filtered_input
    
    
    @incremental(semantic_version=1)
    @transform(
        my_input=Input("/path/to/my/input"),
        my_output=Output("/path/to/my/output")
    )
    def only_write_one_day(my_input, my_output):
      """Filter the input to only rows that are a day after the last written output and process them"""
    
      # Get the previous output and full current input
      previous_output_df = my_output.dataframe("previous", output_schema)
      current_input_df = my_input.dataframe("current")
    
      # Do the processing
      output_df = process_with_dfs(current_input_df, previous_output_df)
    
      # Persist 
      my_output.set_mode("modify")
      my_output.write_dataframe(output_df)
    
    

    You can now set up individual unit tests, assuming your code lives at transforms-python/src/myproject/datasets/output.py, following the methodology here to set everything up correctly.

    Therefore my testing file now looks like the following:

    from pyspark.sql import functions as F, types as T
    from myproject.datasets import (
        only_write_one_day,
        process_with_dfs,
        filter_input_to_date,
        get_next_date,
        get_previous_max_date,
        OUTPUT_SCHEMA,
        JUMP_DAYS,
        START_DATE
    )
    import pytest
    from datetime import date
    
    
    @pytest.fixture
    def empty_output_df(spark_session):
        data = []
        return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
    
    
    @pytest.fixture
    def single_write_output_df(spark_session):
        data = [{
            "date": date(year=2021, month=10, day=1),
            "value": 1
        }]
        return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
    
    
    @pytest.fixture
    def double_write_output_df(spark_session):
        data = [
            {
                "date": date(year=2021, month=10, day=1),
                "value": 1
            },
            {
                "date": date(year=2021, month=10, day=2),
                "value": 2
            }
        ]
        return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
    
    
    @pytest.fixture
    def normal_input_df(spark_session):
        data = [
            {
                "date": date(year=2021, month=10, day=1),
                "value": 1
            },
            {
                "date": date(year=2021, month=10, day=2),
                "value": 2
                }
        ]
        return spark_session.createDataFrame(data, OUTPUT_SCHEMA)
    
    
    # ======= FIRST RUN CASE
    
    def test_first_run_process_with_dfs(normal_input_df, empty_output_df):
        assert True
    
    
    def test_first_run_filter_input_to_date(normal_input_df, empty_output_df):
        assert True
    
    
    def test_first_run_get_next_date(normal_input_df, empty_output_df):
        assert True
    
    
    def test_first_run_get_previous_max_date(normal_input_df, empty_output_df):
        assert True
    
    
    # ======= NORMAL CASE
    
    def test_normal_run_process_with_dfs(normal_input_df, single_write_output_df):
        assert True
    
    
    def test_normal_run_filter_input_to_date(normal_input_df, single_write_output_df):
        assert True
    
    
    def test_normal_run_get_next_date(normal_input_df, single_write_output_df):
        assert True
    
    
    def test_normal_run_get_previous_max_date(normal_input_df, single_write_output_df):
        assert True
    
    

    It's worth noting this is why you can enable things like McCabe complexity checkers and unit test coverage features inside Foundry so you can break up your code into smaller more durable pieces like this.

    Following a design pattern like this will give you much more durable code that is more trustworthy in incremental transforms.

    If you adopt this style of transform, you will also be able to iterate much faster on perfecting your logic by running the individual test you are looking for using the Code Repository feature of "Test". You can open the test file and click the green "Test" button next to the specific case you are interested in, which will let you get your logic written much faster than clicking build every time and trying to get your input conditions lined up like you want.