pythonmicrosoft-fabric

Fabric Notebook Unauthorized error when reading parquet file


I am relatively new to writing python and using fabric in our company. In doing so I have created a fairly simple script that writes to a parquet file and then I want to take that parquet file and write it to a delta table. However, that is where I am running into trouble.

below is my script. This writes the parquet file just fine. It errors out when I try to take it to the delta table.

My initial try was to do everything together. But I am getting errors when this gets to writing the parquet to delta table. I have tried several different configurations but none seem to work. Even one that works in another script doesn't work here.

import subprocess
import json
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql import SparkSession

access_token = "TOKEN"

# Define your API endpoint and headers
url = "https://api.monday.com/v2"
headers = {
    "Authorization": access_token,
    "Content-Type": "application/json"
}

# Define your GraphQL query
query = """
query {
  users {
    created_at
    email
    account {
      name
      id
    }
  }
}
"""

# Send the request
response = requests.post(url, json={'query': query}, headers=headers)

# Check for errors
if response.status_code == 200:
    data = response.json()
    # print(data)
    users = data['data']['users']

    # Convert the data to a DataFrame
    df = pd.json_normalize(users)

    # Write the DataFrame to a Parquet file
    # df.to_parquet('users_data.parquet', engine='pyarrow')
    # print("Data written to users_data.parquet")
    parquet_table_name = "MondayUsers.parquet"
    parquet_file_path = "abfss://Files/Monday/"

    df.to_parquet(parquet_file_path + parquet_table_name, engine='pyarrow', index=False)
    print("Data written to Parquet file")

    # Initialize Spark session with Delta Lake configurations
    spark = SparkSession.builder \
        .appName("FabricNotebook") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    # Define the path to your Parquet file
    parquet_file_path = parquet_file_path + parquet_table_name

 # Read the Parquet file into a Spark DataFrame
    try:
        #load parquet file to data lake table
        from pyspark.sql import SparkSession

        # Initialize Spark session
        spark = SparkSession.builder.appName("FabricNotebook").getOrCreate()

        # Define the path to your Parquet file
 

        # Read the Parquet file into a Spark DataFrame
        df = spark.read.parquet(parquet_file_path)

        # Define the Delta table name
        delta_table_name = "monday.Users"

        # Write the Spark DataFrame to the Delta table
        df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable(delta_table_name)



    except Exception as e:
        print(f"Error reading Parquet file or writing to Delta table: {e}")
 
    


else:
    print(f"Query failed with status code {response.status_code}")

 

The error this receives is:
Error reading Parquet file or writing to Delta table: An error occurred while calling o6842.toString. Trace: java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)

I then thought I'd take it and put it into another section and just read from the parquet and write to a delta table. But then I'm getting "unauthorized" with this

import pandas as pd

parquet_table_name = "MondayUsers.parquet"
parquet_file_path = "abfss://Files/Monday/"

parquet_file = parquet_file_path + parquet_table_name

parquet_df = pd.read_parquet(parquet_file)

spark_df = spark.createDataFrame(parquet_df)

print(spark_df)

# Define the Delta table name
delta_table_name = "monday.Users"

spark_df.write.format("delta").mode("overwrite").saveAsTable(delta_table_name)

Solution

  • From Pandas (or other driver-node python code) you write to the default lakehouse using the file mount. So instead of

    parquet_file_path = "abfss://Files/Monday/"

    use

    parquet_file_path = "/lakehouse/default/Files/Monday/"

    But you can skip pandas entirely, and create Spark Row objects from the JSON returned from the API. Here's a sample that calls the Fabric Admin API, and saves the results directly in a Delta table:

    import json
    import sempy.fabric as fabric
    from pyspark.sql import Row
    
    client = fabric.FabricRestClient()
    
    uri = "v1/admin/items?type=Report"
    
    all_items = []
    while True:
        result = client.get(uri)
        json_results = json.loads(result.content)
        for i in  json_results['itemEntities']:
            all_items.append(i)
            
        ct =  json_results['continuationToken']
        if (ct is None):
            break
        uri = f"v1/admin/items?continuationToken={ct}"
    
        
    rows = [Row(**json_dict) for json_dict in all_items]
    df = spark.createDataFrame(rows)
    display(df)
        
    df.write.format('delta').mode('overwrite').saveAsTable("reports")