pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transform

Why don't I see log lines in my PySpark code when I would expect them to appear?


I have some PySpark code I'm writing where I want to execute joins and other operations, but I want to log when this phase is successfully completed.

Why don't I see this logged in the order I would expect? It sure seems like everything shows up all at once even when my job is continuing to do work...


Solution

  • It's important to understand the different model PySpark uses when describing the work it's going to do.

    PySpark is fundamentally lazy in its query evaluation and will wait to perform any work you request until absolutely necessary.

    This means that even when you describe one join, log some things, then continue on with another join, the first join won't actually have been executed yet. This is because under normal circumstances it won't actually start doing anything until you call the write_dataframe method at the very end of your transform.

    Some things that are exceptions to this are .count(), .first(), .take(), and anything that forces Spark to evaluate the incoming DataFrame and return its result to you. This means it will be forced to actually evaluate the query before the .count(), and return its result to you before it goes any further in your Python code.

    This is exact reason why, for performance reasons, it's an antipattern to use methods such as these in your code since they probably aren't directly contributing to your final dataset build; they are materializing a summary that probably isn't leading to your output.

    For a concrete example, let's consider the following:

    my_input_df = ...
    my_other_df = ...
    
    my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
    print("I joined!")
    my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
    my_output.write_dataframe(my_enriched_df)
    

    I joined! will be printed to console right at the beginning of your job, and your work will continue executing materializing the join and withColumn as if nothing happened. This is because Python doesn't block the main thread for your print statement since it isn't forcing evaluation of your DataFrame.

    If, however, I changed my code to the following:

    my_input_df = ...
    my_other_df = ...
    
    my_joined_df = my_input_df.join(my_other_df, on="my_joint_col", how="inner")
    print("I joined {0} rows!".format(my_joined_df.count())
    my_enriched_df = my_joined_df.withColumn("my_other_column", F.lit(1))
    my_output.write_dataframe(my_enriched_df)
    

    Then I would observe I joined X rows! in logs, and my job would halt its execution to materialize the count before doing any more work. This would mean slower execution in my overall build, and very likely, a re-materialization of work we've done previously. This is why when writing code in Code Repositories, you'll notice often warnings for these kinds of methods that halt execution of your output.

    We generally encourage users to write code that encourages lazy evaluation of DataFrames and avoids halting your job to print things to console. The logs are likely going to be out of the order you expect or decrease your speed of computation.