I am trying to read the parquet file in azure data factory but we are getting non-primitive issues when we try to load dataset as well as copy activity. kindly help us.
Before using ADF ,the data written by databricks notebook to azure storage location, after the ADF try to load the parquet file.
Databricks inputs as below format:
[{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]
from pyspark.sql.functions import to_json,col
schema = StructType([
StructField("id", StringType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("desc", StringType(), nullable=True),
StructField("Details", StructType([
StructField("Input", StringType(), nullable=True),
StructField("Output", StringType(), nullable=True)
])),
StructField("Failure", StringType(), nullable=True),
StructField("s", StringType(), nullable=True)
])
newJson = [{'Details': {'Input': {'id': '1', 'name': 'asdsdasd', 'a1': None, 'a2': None, 'c': None, 's': None, 'c1': None, 'z': None}, 'Output': '{"msg":"some error"}'}, 'Failure': '{"msg":"error"}', 's': 'f'}, {'Details': {'Input': {'id': '2', 'name': 'sadsadsad', 'a1': 'adsadsad', 'a2': 'sssssss', 'c': 'cccc', 's': 'test', 'c1': 'ind', 'z': '22222'}, 'Output': '{"s":"2"}'}, 'Failure': '', 's': 's'}]
df=spark.createDataFrame(data=newJson,schema=schema)
display(df)
df.coalesce(1).write.parquet(f"{adls_url}/test/", mode="overwrite")
after written the json format data into parquet table, we are getting issues while read the file in ADF
When I tried to replicate the issue, I got the same error in my environment:
As per this, below are the data type mappings for Parquet files:
In your data, the details column is taking the charArray data type, which is non-primitive and not supported for Parquet files. So, you can rebuild your Parquet file with primitive data types of schema using the following code:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Initialize Spark session
spark = SparkSession.builder.appName("PrimitiveSchemaDataFrame").getOrCreate()
# Define the schema with primitive data types
primitive_schema = StructType([
StructField("id", StringType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("desc", StringType(), nullable=True),
StructField("details_input_id", StringType(), nullable=True),
StructField("details_input_name", StringType(), nullable=True),
StructField("details_output", StringType(), nullable=True),
StructField("failure", StringType(), nullable=True),
StructField("s", StringType(), nullable=True)
])
# Define the JSON data as a list of dictionaries
json_data = [
{
'id': '1',
'name': 'asdsdasd',
'desc': None,
'details_input_id': '1',
'details_input_name': 'asdsdasd',
'details_output': '{"msg":"some error"}',
'failure': '{"msg":"error"}',
's': 'f'
},
{
'id': '2',
'name': 'sadsadsad',
'desc': None,
'details_input_id': '2',
'details_input_name': 'sadsadsad',
'details_output': '{"s":"2"}',
'failure': '',
's': 's'
}
]
# Create the DataFrame with the schema and JSON data
df = spark.createDataFrame(json_data, schema=primitive_schema)
df.coalesce(1).write.parquet("abfss://<containerName>@<storageAccount>.dfs.core.windows.net/parq/", mode="overwrite")
You will get the Parquet file as shown below:
Try to import the schema with the above Parquet file as a source dataset, and you will be able to import it as shown below: