I want to parse a series of .csv
files using spark.read.csv
, but I want to include the row number of each line inside the file.
I know that Spark typically doesn't order DataFrames unless explicitly told to do so, and I don't want to write my own parser of .csv
files since this will be substantially slower than Spark's own implementation. How can I add this row number in a distributed-safe fashion?
From reading about zipWithIndex, it seems like it could be useful but it unfortunately seems to require the partition ordering to be stable
Let's assume we have the following setup which is used to create a .csv
file on disk with contents that we control:
from pyspark.sql import types as T, functions as F, SparkSession
import os
import tempfile
spark = SparkSession.builder.getOrCreate()
# Synthesize DataFrames
schema = T.StructType([
T.StructField("col_1", T.StringType(), False),
T.StructField("col_2", T.IntegerType(), False),
T.StructField("col_3", T.StringType(), False),
T.StructField("col_4", T.IntegerType(), False),
])
data = [
{"col_1": "key_1", "col_2": 1, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_2", "col_2": 2, "col_3": "CREATE", "col_4": 0},
{"col_1": "key_3", "col_2": 3, "col_3": "CREATE", "col_4": 0},
]
final_data = []
# Scale up a bit
for _ in range(3):
final_data += data
def refresh():
df = spark.createDataFrame(final_data, schema)
with tempfile.TemporaryDirectory() as tmpdirname:
pathname = os.path.join(
tmpdirname + "output.csv"
)
df.write.format("csv").option(
"header",
True
).save(pathname)
return spark.read.option(
"header",
True
).csv(pathname)
In this setup, we can reproducibly create .csv
files and save them to disk, then retrieve them as one would when parsing them for the first time.
Our strategy to parse these files will come down to the following:
zipWithIndex
This looks like the following:
def create_index(
parsed_df,
row_number_column_name="index",
file_name_column_name="_file_name",
block_start_column_name="_block_start",
row_id_column_name="_row_id",
):
unindexed_df = parsed_df.selectExpr(
*parsed_df.columns,
f"input_file_name() AS {file_name_column_name}",
f"input_file_block_start() AS {block_start_column_name}",
f"monotonically_increasing_id() AS {row_id_column_name}"
).orderBy(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
# Unfortunately, we have to unwrap the results of zipWithIndex, so there's some aliasing required
input_cols = unindexed_df.columns
zipped = unindexed_df.rdd.zipWithIndex().toDF()
aliased_columns = []
for input_col in input_cols:
aliased_columns += [zipped["_1." + input_col].alias(input_col)]
# Alias the original columns, remove the ones we built internally
return zipped.select(
*aliased_columns,
zipped["_2"].alias(row_number_column_name)
).drop(
file_name_column_name,
block_start_column_name,
row_id_column_name
)
example_df = refresh()
example_df = create_index(example_df)
example_df.show()
"""
+-----+-----+------+-----+-----+
|col_1|col_2| col_3|col_4|index|
+-----+-----+------+-----+-----+
|key_1| 1|CREATE| 0| 0|
|key_2| 2|CREATE| 0| 1|
|key_3| 3|CREATE| 0| 2|
|key_1| 1|CREATE| 0| 3|
|key_2| 2|CREATE| 0| 4|
|key_3| 3|CREATE| 0| 5|
|key_1| 1|CREATE| 0| 6|
|key_2| 2|CREATE| 0| 7|
|key_3| 3|CREATE| 0| 8|
+-----+-----+------+-----+-----+
"""