apache-sparkpysparkpalantir-foundryfoundry-code-repositories

How to filter out specific rows of dataframe in pyspark?


I want to filter out prev_df if input_df 's "Wellname" values exist in prev_df's "Wellname" column. For example:

input_df:

 Wellname WellType Platform
0       E17 Producer      DWG
1       E17 Producer      DWG
2       E17 Producer      DWG
3      E20Y Producer      DWG
4      E20Y Producer      DWG
5      E20Y Producer      DWG
6      E20Y Producer      DWG
7      E20Y Producer      DWG

And this is the prev_df:

   Wellname WellType Platform
0       E17 Producer      CH
1       E17 Producer      CH
2       E17 Producer      CH
3       E21 Producer      DWG
4       E21 Producer      DWG
5       E21 Producer      DWG

From the examples above, prev_df should be:

   Wellname WellType Platform
0       E21 Producer      CH
1       E21 Producer      CH
2       E21 Producer      CH

I tried to implement this logic, however, I get the following error: spark AttributeError: 'DataFrame' object has no attribute '_get_object_id'

from transforms.api import transform, incremental, Input, Output, configure
from pyspark.sql import types as T   
schema = T.StructType([
    T.StructField('Wellname', T.StringType()),
    T.StructField('Platform', T.StringType()),
    T.StructField('date', T.TimestampType()),
    T.StructField('oil_STB_d', T.DoubleType()),
    T.StructField('water_STB_d', T.DoubleType()),
    T.StructField('GOR_scf_STB', T.DoubleType()),
    T.StructField('WHP_psi', T.DoubleType()),
    T.StructField('BHP_psi', T.DoubleType()),
    T.StructField('predicted_T_Celcius', T.DoubleType()),
    T.StructField('Precipitated_wax_wt_percentage_with_predicted_T_minus_5_Celcius', T.DoubleType()),
    T.StructField('Rate_of_wax_precipitation_with_predicted_T_minus_5_Celcius_kg_per_d', T.DoubleType()),
    T.StructField('Precipitated_wax_wt_percentage_with_predicted_T_0_Celcius', T.DoubleType()),
    T.StructField('Rate_of_wax_precipitation_with_predicted_T_0_Celcius_kg_per_d', T.DoubleType()),
    T.StructField('Precipitated_wax_wt_percentage_with_predicted_T_plus_5_Celcius', T.DoubleType()),
    T.StructField('Rate_of_wax_precipitation_with_predicted_T_plus_5_Celcius_kg_per_d', T.DoubleType()),
    T.StructField('WAT_DegC', T.DoubleType()),
    T.StructField('Wax_Content_percentage', T.DoubleType())
])    

@configure(profile=['KUBERNETES_NO_EXECUTORS'])
@incremental(require_incremental=True, snapshot_inputs=["input_df"])
@transform(
    input_df=Input('ri.foundry.lava-catalog.dataset.3c85fd4e-ca5d-4f9e-bbab-c7d3aef1da9e'),
    output_df=Output('ri.foundry.lava-catalog.dataset.44012abe-87d6-4d38-802c-16a2ed447e07')
)
def incremental_filter(input_df, output_df):
    new_df = input_df.dataframe()

    prev_df = output_df.dataframe('previous', schema)
    #Error in the following line
    prev_df = prev_df.filter(~prev_df.Wellname.isin(new_df.select('Wellname').distinct()))
    new_df = new_df.unionByName(prev_df)
    mode = 'replace'

    output_df.set_mode(mode)
    output_df.write_dataframe(new_df)

How can I solve it?


Solution

  • You can perform this operation in pyspark with an antijoin.

    This is the statement you could use:

        from pyspark.sql.functions import col
        new_distinct_df = new_df.select("Wellname").distinct()
        prev_df = prev_df.join(
            new_distinct_df,
            "Wellname",
            "left_anti"
        )