I am relatively new to writing python and using fabric in our company. In doing so I have created a fairly simple script that writes to a parquet file and then I want to take that parquet file and write it to a delta table. However, that is where I am running into trouble.
below is my script. This writes the parquet file just fine. It errors out when I try to take it to the delta table.
My initial try was to do everything together. But I am getting errors when this gets to writing the parquet to delta table. I have tried several different configurations but none seem to work. Even one that works in another script doesn't work here.
import subprocess
import json
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
access_token = "TOKEN"
# Define your API endpoint and headers
url = "https://api.monday.com/v2"
headers = {
"Authorization": access_token,
"Content-Type": "application/json"
}
# Define your GraphQL query
query = """
query {
users {
created_at
email
account {
name
id
}
}
}
"""
# Send the request
response = requests.post(url, json={'query': query}, headers=headers)
# Check for errors
if response.status_code == 200:
data = response.json()
# print(data)
users = data['data']['users']
# Convert the data to a DataFrame
df = pd.json_normalize(users)
# Write the DataFrame to a Parquet file
# df.to_parquet('users_data.parquet', engine='pyarrow')
# print("Data written to users_data.parquet")
parquet_table_name = "MondayUsers.parquet"
parquet_file_path = "abfss://Files/Monday/"
df.to_parquet(parquet_file_path + parquet_table_name, engine='pyarrow', index=False)
print("Data written to Parquet file")
# Initialize Spark session with Delta Lake configurations
spark = SparkSession.builder \
.appName("FabricNotebook") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Define the path to your Parquet file
parquet_file_path = parquet_file_path + parquet_table_name
# Read the Parquet file into a Spark DataFrame
try:
#load parquet file to data lake table
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("FabricNotebook").getOrCreate()
# Define the path to your Parquet file
# Read the Parquet file into a Spark DataFrame
df = spark.read.parquet(parquet_file_path)
# Define the Delta table name
delta_table_name = "monday.Users"
# Write the Spark DataFrame to the Delta table
df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable(delta_table_name)
except Exception as e:
print(f"Error reading Parquet file or writing to Delta table: {e}")
else:
print(f"Query failed with status code {response.status_code}")
The error this receives is:
Error reading Parquet file or writing to Delta table: An error occurred while calling o6842.toString. Trace:
java.lang.IllegalArgumentException: object is not an instance of declaring class
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
I then thought I'd take it and put it into another section and just read from the parquet and write to a delta table. But then I'm getting "unauthorized" with this
import pandas as pd
parquet_table_name = "MondayUsers.parquet"
parquet_file_path = "abfss://Files/Monday/"
parquet_file = parquet_file_path + parquet_table_name
parquet_df = pd.read_parquet(parquet_file)
spark_df = spark.createDataFrame(parquet_df)
print(spark_df)
# Define the Delta table name
delta_table_name = "monday.Users"
spark_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)
From Pandas (or other driver-node python code) you write to the default lakehouse using the file mount. So instead of
parquet_file_path = "abfss://Files/Monday/"
use
parquet_file_path = "/lakehouse/default/Files/Monday/"
But you can skip pandas entirely, and create Spark Row objects from the JSON returned from the API. Here's a sample that calls the Fabric Admin API, and saves the results directly in a Delta table:
import json
import sempy.fabric as fabric
from pyspark.sql import Row
client = fabric.FabricRestClient()
uri = "v1/admin/items?type=Report"
all_items = []
while True:
result = client.get(uri)
json_results = json.loads(result.content)
for i in json_results['itemEntities']:
all_items.append(i)
ct = json_results['continuationToken']
if (ct is None):
break
uri = f"v1/admin/items?continuationToken={ct}"
rows = [Row(**json_dict) for json_dict in all_items]
df = spark.createDataFrame(rows)
display(df)
df.write.format('delta').mode('overwrite').saveAsTable("reports")