pythonjsonapache-sparkpyspark

Pyspark: Parse a column of json strings


I have a pyspark dataframe consisting of one column, called json, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

I've tried mapping over each row with json.loads:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

But this returns a TypeError: expected string or buffer

I suspect that part of the problem is that when converting from a dataframe to an rdd, the schema information is lost, so I've also tried manually entering in the schema info:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

But I get the same TypeError.

Looking at this answer, it looks like flattening out the rows with flatMap might be useful here, but I'm not having success with that either:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

I get this error: AttributeError: 'unicode' object has no attribute 'get'.


Solution

  • Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

    For example:

    >>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
    >>> new_df.printSchema()
    root
     |-- body: struct (nullable = true)
     |    |-- id: long (nullable = true)
     |    |-- name: string (nullable = true)
     |    |-- sub_json: struct (nullable = true)
     |    |    |-- id: long (nullable = true)
     |    |    |-- sub_sub_json: struct (nullable = true)
     |    |    |    |-- col1: long (nullable = true)
     |    |    |    |-- col2: string (nullable = true)
     |-- header: struct (nullable = true)
     |    |-- foo: string (nullable = true)
     |    |-- id: long (nullable = true)