pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transform

How do I add a column indicating the row number from a file on disk?


I want to parse a series of .csv files using spark.read.csv, but I want to include the row number of each line inside the file.

I know that Spark typically doesn't order DataFrames unless explicitly told to do so, and I don't want to write my own parser of .csv files since this will be substantially slower than Spark's own implementation. How can I add this row number in a distributed-safe fashion?

From reading about zipWithIndex, it seems like it could be useful but it unfortunately seems to require the partition ordering to be stable


Solution

  • Let's assume we have the following setup which is used to create a .csv file on disk with contents that we control:

    from pyspark.sql import types as T, functions as F, SparkSession
    import os
    import tempfile
    
    spark = SparkSession.builder.getOrCreate()
    
    
    # Synthesize DataFrames
    schema = T.StructType([
      T.StructField("col_1", T.StringType(), False),
      T.StructField("col_2", T.IntegerType(), False),
      T.StructField("col_3", T.StringType(), False),
      T.StructField("col_4", T.IntegerType(), False),
    ])
    data = [
      {"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
      {"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
      {"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
    ]
    
    final_data = []
    # Scale up a bit
    for _ in range(3):
        final_data += data
    
    def refresh():
        df = spark.createDataFrame(final_data, schema)
        with tempfile.TemporaryDirectory() as tmpdirname:
            pathname = os.path.join(
                tmpdirname + "output.csv"
            )
            df.write.format("csv").option(
                "header",
                True
            ).save(pathname)
    
            return spark.read.option(
                "header",
                True
            ).csv(pathname)
    
    

    In this setup, we can reproducibly create .csv files and save them to disk, then retrieve them as one would when parsing them for the first time.

    Our strategy to parse these files will come down to the following:

    1. Create identifiers based on the contents we find on disk using the file name, block_start (in case Spark breaks apart the files into multiple partitions), and an sortable identifier within each partition
    2. Sort the parsed contents on these identifiers, thus guaranteeing order
    3. Create a row_number identifier using zipWithIndex

    This looks like the following:

    def create_index(
            parsed_df,
            row_number_column_name="index",
            file_name_column_name="_file_name",
            block_start_column_name="_block_start",
            row_id_column_name="_row_id",
        ):
        unindexed_df = parsed_df.selectExpr(
            *parsed_df.columns,
            f"input_file_name() AS {file_name_column_name}",
            f"input_file_block_start() AS {block_start_column_name}",
            f"monotonically_increasing_id() AS {row_id_column_name}"
        ).orderBy(
            file_name_column_name,
            block_start_column_name,
            row_id_column_name
        )
    
        # Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
        input_cols = unindexed_df.columns
        zipped = unindexed_df.rdd.zipWithIndex().toDF()
        aliased_columns = []
        for input_col in input_cols:
            aliased_columns += [zipped["_1." + input_col].alias(input_col)]
    
        # Alias the original columns, remove the ones we built internally
        return zipped.select(
            *aliased_columns,
            zipped["_2"].alias(row_number_column_name)
        ).drop(
            file_name_column_name,
            block_start_column_name,
            row_id_column_name
        )
    
    example_df = refresh()
    example_df = create_index(example_df)
    example_df.show()
    
    """
    +-----+-----+------+-----+-----+
    |col_1|col_2| col_3|col_4|index|
    +-----+-----+------+-----+-----+
    |key_1|    1|CREATE|    0|    0|
    |key_2|    2|CREATE|    0|    1|
    |key_3|    3|CREATE|    0|    2|
    |key_1|    1|CREATE|    0|    3|
    |key_2|    2|CREATE|    0|    4|
    |key_3|    3|CREATE|    0|    5|
    |key_1|    1|CREATE|    0|    6|
    |key_2|    2|CREATE|    0|    7|
    |key_3|    3|CREATE|    0|    8|
    +-----+-----+------+-----+-----+
    """