I want to filter out prev_df
if input_df
's "Wellname" values exist in prev_df
's "Wellname" column. For example:
input_df
:
Wellname WellType Platform
0 E17 Producer DWG
1 E17 Producer DWG
2 E17 Producer DWG
3 E20Y Producer DWG
4 E20Y Producer DWG
5 E20Y Producer DWG
6 E20Y Producer DWG
7 E20Y Producer DWG
And this is the prev_df
:
Wellname WellType Platform
0 E17 Producer CH
1 E17 Producer CH
2 E17 Producer CH
3 E21 Producer DWG
4 E21 Producer DWG
5 E21 Producer DWG
From the examples above, prev_df
should be:
Wellname WellType Platform
0 E21 Producer CH
1 E21 Producer CH
2 E21 Producer CH
I tried to implement this logic, however, I get the following error: spark AttributeError: 'DataFrame' object has no attribute '_get_object_id'
from transforms.api import transform, incremental, Input, Output, configure
from pyspark.sql import types as T
schema = T.StructType([
T.StructField('Wellname', T.StringType()),
T.StructField('Platform', T.StringType()),
T.StructField('date', T.TimestampType()),
T.StructField('oil_STB_d', T.DoubleType()),
T.StructField('water_STB_d', T.DoubleType()),
T.StructField('GOR_scf_STB', T.DoubleType()),
T.StructField('WHP_psi', T.DoubleType()),
T.StructField('BHP_psi', T.DoubleType()),
T.StructField('predicted_T_Celcius', T.DoubleType()),
T.StructField('Precipitated_wax_wt_percentage_with_predicted_T_minus_5_Celcius', T.DoubleType()),
T.StructField('Rate_of_wax_precipitation_with_predicted_T_minus_5_Celcius_kg_per_d', T.DoubleType()),
T.StructField('Precipitated_wax_wt_percentage_with_predicted_T_0_Celcius', T.DoubleType()),
T.StructField('Rate_of_wax_precipitation_with_predicted_T_0_Celcius_kg_per_d', T.DoubleType()),
T.StructField('Precipitated_wax_wt_percentage_with_predicted_T_plus_5_Celcius', T.DoubleType()),
T.StructField('Rate_of_wax_precipitation_with_predicted_T_plus_5_Celcius_kg_per_d', T.DoubleType()),
T.StructField('WAT_DegC', T.DoubleType()),
T.StructField('Wax_Content_percentage', T.DoubleType())
])
@configure(profile=['KUBERNETES_NO_EXECUTORS'])
@incremental(require_incremental=True, snapshot_inputs=["input_df"])
@transform(
input_df=Input('ri.foundry.lava-catalog.dataset.3c85fd4e-ca5d-4f9e-bbab-c7d3aef1da9e'),
output_df=Output('ri.foundry.lava-catalog.dataset.44012abe-87d6-4d38-802c-16a2ed447e07')
)
def incremental_filter(input_df, output_df):
new_df = input_df.dataframe()
prev_df = output_df.dataframe('previous', schema)
#Error in the following line
prev_df = prev_df.filter(~prev_df.Wellname.isin(new_df.select('Wellname').distinct()))
new_df = new_df.unionByName(prev_df)
mode = 'replace'
output_df.set_mode(mode)
output_df.write_dataframe(new_df)
How can I solve it?
You can perform this operation in pyspark with an antijoin.
This is the statement you could use:
from pyspark.sql.functions import col
new_distinct_df = new_df.select("Wellname").distinct()
prev_df = prev_df.join(
new_distinct_df,
"Wellname",
"left_anti"
)