apache-sparkpyspark

Fetching data from REST API to Spark Dataframe using Pyspark


i am building a datapipeline which consume data from RESTApi in json format and pushed to Spark Dataframe. Spark Version: 2.4.4

but getting error as

df = SQLContext.jsonRDD(rdd) 
AttributeError: type object 'SQLContext' has no attribute 'jsonRDD'

Code:

from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from urllib import urlopen
from pyspark import SQLContext
import json
spark = SparkSession \
.builder \
.appName("DataCleansing") \
.getOrCreate()


def convert_single_object_per_line(json_list):
    json_string = ""
    for line in json_list:
        json_string += json.dumps(line) + "\n"
    return json_string

def parse_dataframe(json_data):
    r = convert_single_object_per_line(json_data)
    mylist = []
    for line in r.splitlines():
        mylist.append(line)
    rdd = spark.sparkContext.parallelize(mylist)
    df = SQLContext.jsonRDD(rdd)
    return df

url = "https://mylink"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)

 

if there is any other better way to query RestApi and bring data to Spark Dataframe using Pyspark.

I am not sure if i am missing something.


Solution

  • Check Spark Rest API Data source. One advantage with this library is it will use multiple executors to fetch data rest api & create data frame for you.

    In your code, you are fetching all data into the driver & creating DataFrame, It might fail with heap space if you have very huge data.

    url = "https://mylink"
    options = { 'url' : url, 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
    
    # Now we create the Dataframe which contains the result from the call to the API
    df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**options).load()