pysparkazure-data-factoryazure-databricks

Azure Data Factory Parquet File Read non-primitive issues


I am trying to read the parquet file in azure data factory but we are getting non-primitive issues when we try to load dataset as well as copy activity. kindly help us.

Before using ADF ,the data written by databricks notebook to azure storage location, after the ADF try to load the parquet file.

enter image description here enter image description here

Databricks inputs as below format:

[{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

from pyspark.sql.functions import to_json,col
schema = StructType([
    StructField("id", StringType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("desc", StringType(), nullable=True),
    StructField("Details", StructType([
        StructField("Input", StringType(), nullable=True),
        StructField("Output", StringType(), nullable=True)
    ])),
    StructField("Failure", StringType(), nullable=True),
    StructField("s", StringType(), nullable=True)
])

newJson = [{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]

df=spark.createDataFrame(data=newJson,schema=schema)
display(df)
df.coalesce(1).write.parquet(f"{adls_url}/test/", mode="overwrite")

after written the json format data into parquet table, we are getting issues while read the file in ADF


Solution

  • When I tried to replicate the issue, I got the same error in my environment:

    enter image description here

    As per this, below are the data type mappings for Parquet files:

    enter image description here

    In your data, the details column is taking the charArray data type, which is non-primitive and not supported for Parquet files. So, you can rebuild your Parquet file with primitive data types of schema using the following code:

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    # Initialize Spark session
    spark = SparkSession.builder.appName("PrimitiveSchemaDataFrame").getOrCreate()
    
    # Define the schema with primitive data types
    primitive_schema = StructType([
        StructField("id", StringType(), nullable=False),
        StructField("name", StringType(), nullable=True),
        StructField("desc", StringType(), nullable=True),
        StructField("details_input_id", StringType(), nullable=True),
        StructField("details_input_name", StringType(), nullable=True),
        StructField("details_output", StringType(), nullable=True),
        StructField("failure", StringType(), nullable=True),
        StructField("s", StringType(), nullable=True)
    ])
    
    # Define the JSON data as a list of dictionaries
    json_data = [
        {
            'id': '1',
            'name': 'asdsdasd',
            'desc': None,
            'details_input_id': '1',
            'details_input_name': 'asdsdasd',
            'details_output': '{"msg":"some error"}',
            'failure': '{"msg":"error"}',
            's': 'f'
        },
        {
            'id': '2',
            'name': 'sadsadsad',
            'desc': None,
            'details_input_id': '2',
            'details_input_name': 'sadsadsad',
            'details_output': '{"s":"2"}',
            'failure': '',
            's': 's'
        }
    ]
    
    # Create the DataFrame with the schema and JSON data
    df = spark.createDataFrame(json_data, schema=primitive_schema)
    
    df.coalesce(1).write.parquet("abfss://<containerName>@<storageAccount>.dfs.core.windows.net/parq/", mode="overwrite")
    

    You will get the Parquet file as shown below:

    enter image description here

    Try to import the schema with the above Parquet file as a source dataset, and you will be able to import it as shown below:

    enter image description here