i am building a datapipeline which consume data from RESTApi in json format and pushed to Spark Dataframe. Spark Version: 2.4.4
but getting error as
df = SQLContext.jsonRDD(rdd)
AttributeError: type object 'SQLContext' has no attribute 'jsonRDD'
Code:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from urllib import urlopen
from pyspark import SQLContext
import json
spark = SparkSession \
.builder \
.appName("DataCleansing") \
.getOrCreate()
def convert_single_object_per_line(json_list):
json_string = ""
for line in json_list:
json_string += json.dumps(line) + "\n"
return json_string
def parse_dataframe(json_data):
r = convert_single_object_per_line(json_data)
mylist = []
for line in r.splitlines():
mylist.append(line)
rdd = spark.sparkContext.parallelize(mylist)
df = SQLContext.jsonRDD(rdd)
return df
url = "https://mylink"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)
if there is any other better way to query RestApi and bring data to Spark Dataframe using Pyspark.
I am not sure if i am missing something.
Check Spark Rest API Data source. One advantage with this library is it will use multiple executors to fetch data rest api & create data frame for you.
In your code, you are fetching all data into the driver & creating DataFrame, It might fail with heap space if you have very huge data.
url = "https://mylink"
options = { 'url' : url, 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
# Now we create the Dataframe which contains the result from the call to the API
df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**options).load()