apache-sparkpysparkapache-spark-sqlspark-csv

Can I read a CSV represented as a string into Apache Spark using spark-csv?


I know how to read a CSV file into Apache Spark using spark-csv, but I already have the CSV file represented as a string and would like to convert this string directly to dataframe. Is this possible?


Solution

  • Update for Spark 3.x - although actually more for Java 17, to make it compatible with the new lines() function type nature:

    import org.apache.spark.sql.{Dataset, SparkSession}
    val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
    
    import spark.implicits._
    import scala.collection.JavaConverters._
    
    val csvData: Dataset[String] = ("""
                                      |id, date, timedump
                                      |1, "2014/01/01 23:00:01",1499959917383
                                      |2, "2014/11/31 12:40:32",1198138008843
          """.stripMargin.lines.toList.asScala).toDS()
    
    val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
    frame.show()
    frame.printSchema()
    

    Starting from Spark 2.2.x

    There is finally a proper way to do it using Dataset.

    import org.apache.spark.sql.{Dataset, SparkSession}
    val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()
    
    import spark.implicits._
    val csvData: Dataset[String] = spark.sparkContext.parallelize(
      """
        |id, date, timedump
        |1, "2014/01/01 23:00:01",1499959917383
        |2, "2014/11/31 12:40:32",1198138008843
      """.stripMargin.lines.toList).toDS()
    
    val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
    frame.show()
    frame.printSchema()
    

    Old Apache Spark versions

    Actually you can, though it's using library internals and not widely advertised. Just create and use your own CsvParser instance. An example, that works for me on Spark 1.6.0 and spark-csv_2.10-1.4.0, is below:

    import com.databricks.spark.csv.CsvParser
    
    val csvData = """
        |userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
        |1,1,user1,m1,l1,mr
        |2,2,user2,m2,l2,mr
        |3,3,user3,m3,l3,mr
        |""".stripMargin
        val rdd = sc.parallelize(csvData.lines.toList)
        val csvParser = new CsvParser()
          .withUseHeader(true)
          .withInferSchema(true)
    
    
        val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)