scalaapache-sparkparsingfixed-length-file

Fixed length parsing in spark scala


I have created the dataframe and the input is like this:

   +-----------------------------------+
   |value                              |
   +-----------------------------------+
   |1   PRE123                    21   |
   |2   TEST                      32   |
   |7   XYZ                       .7   |
   +-----------------------------------+

and on the basis on the below metadata information we need to split the above data frame and create a new dataframe, having columns name id,name and class and it start and index loction is given in this json meta data.

   {
    "columnName": "id",
    "start": 1,
    "end": 2
  },
  {
    "columnName": "name",
    "start": 5,
    "end": 10
  },
  {
    "columnName": "class",
    "start": 20,
    "end": 22
  }

OUTPUT :

  +---+------+-----+
  | id|  name|class|
  +---+------+-----+
  |  1|PRE123|   21|
  |  2|  TEST|   32|
  |  7|   XYZ|   .7|
  +---+------+-----+

For loading the df, I have created the list:

   list.+=(loadedDF.col("value").substr(fixedLength.getStart, (fixedLength.getEnd - fixedLength.getStart)).alias(fixedLength.getColumnName))

and from this list, I have created the dataframe

var df: DataFrame = loadedDF.select(list: _*)

Need to know the order better approach for creating the dataframe from the metadata. As the list created will bring all the data to the driver node.


Solution

  • If I understood correctly you requirements you are trying to extract the columns from a string separated by an arbitrary number of spaces.

    Here is one solution with substr function:

    val df = Seq(
      ("1   PRE123         21"),
      ("2   TEST           32"),
      ("7   XYZ            .7"))
    .toDF("value")
    
    val colMetadata = Map("id" -> (1,2), "name" -> (5,10), "class" -> (20,22))
    
    val columns = colMetadata.map { case (cname, meta) => 
      val len = meta._2 - meta._1   
      $"value".substr(meta._1, len).as(cname)
    }.toSeq
    
    df.select(columns:_*).show
    

    And a generic solution when you don't have the column boundaries available using the split function:

    import org.apache.spark.sql.functions.split
    
    val df = Seq(
      ("1   PRE123         21"),
      ("2   TEST           32"),
      ("7   XYZ            .7"))
    .toDF("value")
    
    val colNames = Seq("id", "name", "class")
    
    val columns = colNames.zipWithIndex.map { case (cname, idx) =>
          split($"value", "\\s+").getItem(idx).as(cname)
    }
    
    df.select(columns:_*).show
    

    Output:

    +---+------+-----+
    | id|  name|class|
    +---+------+-----+
    |  1|PRE123|   21|
    |  2|  TEST|   32|
    |  7|   XYZ|   .7|
    +---+------+-----+
    

    Notice that I used \\s+ as separator. This represents a regex for one or more spaces.