scalaapache-sparkapache-spark-sqljson4s

Cannot find column using from_json to Dataset


import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.Encoders

import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values}
import org.apache.spark.sql.types.{MapType, StringType}

case class MyMeta(op: Option[String], table: Option[String])
val metaSchema = Encoders.product[MyMeta].schema

val path = "/FileStore/tables/json_0002C_file.txt"
val df = spark.read.text(path)   
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))    
val df3 = df2.select(map_values(col("value")))  
val df4 = df3.select($"map_values(value)"(0).as("meta"))  
df4.show(false)
df4.printSchema()
val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta]
df5.show(false)
df5.printSchema()
                     

returns this:

+------------------------------------+
|meta                                |
+------------------------------------+
|{"op":"upd","table":"BILL.PRODUCTS"}|
|{"op":"upd","table":"BILL.PRODUCTS"}|
|{"op":"upd","table":"BILL.SALES"}   |
+------------------------------------+

root
 |-- meta: string (nullable = true)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `op` cannot be resolved. Did you mean one of the following? [`parsedMeta`]

Cannot see what is wrong here.

For DF it is OK, but not for DS.

{  "meta":{    "op":"upd",     "table":"BILL.PRODUCTS"  },  "data":{    "PRICE":"3599"  }, "key":{    "PRODUCT_ID":"230117",    "DESCRIPTION":"Hamsberry vintage tee, cherry",    "PRICE":"4099"  }}
{  "meta":{    "op":"upd",     "table":"BILL.PRODUCTS"  },  "data":{    "PRICE":"4000"  }, "key":{    "PRODUCT_ID":"230117",    "DESCRIPTION":"Hamsberry vintage tee, cherry",    "PRICE":"3599"  }}
{  "meta":{    "op":"upd",     "table":"BILL.SALES"  },  "data":{    "NUM":"20"  }, "key":{    "PRODUCT_ID":"230117",    "DESCRIPTION":"Hamsberry vintage tee, cherry",    "NUM":"10"  }}

Solution

  • Your case class MyMeta is of type op: Option[String], table: Option[String] , this is the type of the column and not the column itself which is named parsedMeta and of type MyMeta, so you need to add the right case class:

    case class MyMeta2(parsedMeta: MyMeta)
    

    Then cast the dataframe to MyMeta2:

    val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta2]
    

    Result:

    +--------------------+
    |parsedMeta          |
    +--------------------+
    |{upd, BILL.PRODUCTS}|
    |{upd, BILL.PRODUCTS}|
    |{upd, BILL.SALES}   |
    +--------------------+
    

    Update: Full solution:

    case class MyMeta(op: Option[String], table: Option[String])
    case class MyMeta2(parsedMeta: MyMeta)
    
    import spark.implicits._
    val metaSchema = Encoders.product[MyMeta].schema
    
    val path = "src/main/resources/input/files/json_0002C_file.txt"
    val df = spark.read.text(path)
    val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))
    val df3 = df2.select(map_values(col("value")))
    val df4 = df3.select(col("map_values(value)")(0).as("meta"))
    df4.show(false)
    df4.printSchema()
    val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta2]
    df5.show(false)
    df5.printSchema()
    

    Update 2: A further explanation

    the problem is coming from this piece of code: .as[MyMeta2]

    You are trying to apply a case class on a dataframe that does not have the same schema.

    When applying a schema to a dataframe it should match exactly with the names and types of columns of that dataframe.

    In you case in this line:

    val df5 = df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").as[MyMeta]
    

    You are trying to apply this schema:

    |-- op: string (nullable = true)
    |-- table: string (nullable = true)
    

    to a dataframe of this type:

     |-- parsedMeta: struct (nullable = true)
     |    |-- op: string (nullable = true)
     |    |-- table: string (nullable = true)
    

    As you can see there's no column of name parsedMeta in your case class, so it will look for the first attribute which is op and try to match it to your dataframe but will not find it because the root column is called parsedMeta, that's why you got this error:

    A column or function parameter with name `op` cannot be resolved. Did you mean one of the following? [`parsedMeta`]
    

    So you need to apply the right schema, which I defined in my answer as MyMeta2.

    How to avoid schema problem in the future:

    Create a schema from your case class like this, and then print it:

    val metaSchema = Encoders.product[MyMeta].schema
    metaSchema.printTreeString()
    

    Result:

    root
     |-- op: string (nullable = true)
     |-- table: string (nullable = true)
    

    Then print the schema of the dataframe you want to apply the schema on:

    df4.withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").printSchema()
    

    Result:

    root
     |-- parsedMeta: struct (nullable = true)
     |    |-- op: string (nullable = true)
     |    |-- table: string (nullable = true)
    

    If the schemas are not matching the names and types of every attribute, then you will get an error.

    Hope it helps.