scalaapache-sparkbigdata

I need to skip three rows from the dataframe while loading from a CSV file in scala


I am loading my CSV file to a data frame and I can do that but I need to skip the starting three lines from the file.

I tried .option() command by giving header as true but it is ignoring the only first line.

val df = spark.sqlContext.read
    .schema(Myschema)
    .option("header",true)
    .option("delimiter", "|")
    .csv(path)

I thought of giving header as 3 lines but I couldn't find the way to do that.

alternative thought: skip those 3 lines from the data frame

Please help me with this. Thanks in Advance.


Solution

  • A generic way to handle your problem would be to index the dataframe and filter the indices that are greater than 2.

    Straightforward approach:

    As suggested in another answer, you may try adding an index with monotonically_increasing_id.

    df.withColumn("Index",monotonically_increasing_id)
      .filter('Index > 2)
      .drop("Index")
    

    Yet, that's only going to work if the first 3 rows are in the first partition. Moreover, as mentioned in the comments, this is the case today but this code may break completely with further versions or spark and that would be very hard to debug. Indeed, the contract in the API is just "The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive". It is therefore not very sage to assume that they will always start from zero. There might even be other cases in the current version in which that does not work (I'm not sure though).

    To illustrate my first concern, have a look at this:

    scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()
    +---+----------+
    | id|     Index|
    +---+----------+
    |  0|         0|
    |  1|         1|
    |  2|8589934592|
    |  3|8589934593|
    +---+----------+
    

    We would only remove two rows...

    Safe approach:

    The previous approach will work most of the time though but to be safe, you can use zipWithIndex from the RDD API to get consecutive indices.

    def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
      val rdd = df.rdd.zipWithIndex
        .map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
      val newSchema = df.schema
        .add(StructField(name, LongType, false))
      df.sparkSession.createDataFrame(rdd, newSchema)
    }
    zipWithIndex(df, "index").where('index > 2).drop("index")
    

    We can check that it's safer:

    scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()
    +---+-----+
    | id|index|
    +---+-----+
    |  0|    0|
    |  1|    1|
    |  2|    2|
    |  3|    3|
    +---+-----+