apache-sparkpysparkapache-spark-sqlrenamepyspark-schema

Rename the column every time in PySpark if it is coming with different name in some files?


I have to rename the column name every time if column name contains address in it.

For example, for the first file I am receiving columns as ADDRESS1, ADDRESS2, ADDRESS3:
enter image description here

For the next file I am receiving column names as T_ADDRESS1, T_ADDRESS2, TADDRESS3:
enter image description here

As I will write this code only once and pass it in my pipeline, how can I change the column name every time if it contains ADDRESS in it using PySpark? As in screenshots we can see, every time if name contains address in it we have to rename to S_ADDRESS1.


Solution

  • A Python function like this could rename dataframe columns:

    import re
    def df_rename(df):
        return df.toDF(*[re.sub(r"^(.*?)(?=ADDRESS)", "S_", c) for c in df.columns])
    

    Test input dataframes:

    df1 = spark.createDataFrame([], "c1 int, ADDRESS1 int, ADDRESS2 int, ADDRESS3 int")
    df1.show()
    # +---+--------+--------+--------+
    # | c1|ADDRESS1|ADDRESS2|ADDRESS3|
    # +---+--------+--------+--------+
    # +---+--------+--------+--------+
    
    df2 = spark.createDataFrame([], "c1 int, T_ADDRESS1 int, T_ADDRESS2 int, T_ADDRESS3 int")
    df2.show()
    # +---+----------+----------+----------+
    # | c1|T_ADDRESS1|T_ADDRESS2|T_ADDRESS3|
    # +---+----------+----------+----------+
    # +---+----------+----------+----------+
    

    Applying the function:

    df1 = df_rename(df1)
    df1.show()
    # +---+----------+----------+----------+
    # | c1|S_ADDRESS1|S_ADDRESS2|S_ADDRESS3|
    # +---+----------+----------+----------+
    # +---+----------+----------+----------+
    
    df2 = df_rename(df2)
    df2.show()
    # +---+----------+----------+----------+
    # | c1|S_ADDRESS1|S_ADDRESS2|S_ADDRESS3|
    # +---+----------+----------+----------+
    # +---+----------+----------+----------+