apache-sparkazure-synapseazure-notebooks

Creating a blank delta table in azure synapse analytics with identity column


I'm new to ASA and I am trying to create a blank delta table with an identity column that auto increments by 1. Is there anyway to do this without using dedicated SQL?

Tried using TSQL syntax but it seems like there are some limitations.


Solution

  • You can create an empty Delta table with an identity column using the code below in a Synapse notebook:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    from pyspark.sql.window import Window
    import pyspark.sql.functions as F
    
    # Create a Spark session
    spark = SparkSession.builder.appName("BlankDeltaTableWithIdentity").getOrCreate()
    
    schema = StructType([
        StructField("Id", IntegerType(), False),
        StructField("Name", StringType(), True)  # Set nullable to True for other columns
    ])
    
    df = spark.createDataFrame([], schema)
    
    # Add an identity column starting from 1 and incrementing by 1
    df = df.withColumn("Id", lit(0))  # Initialize the Id column with zeros
    
    # Create a Window specification for row_number()
    window_spec = Window.orderBy(F.monotonically_increasing_id())
    
    # Use row_number() to generate sequential numbers starting from 1
    df = df.withColumn("Id", F.row_number().over(window_spec))
    
    # Write the DataFrame as a Delta table
    df.write.format("delta").mode("overwrite").save("<deltaTablePath>")
    

    This will create a Delta table successfully in the required path, as shown below:

    Delta table

    Delta table details

    You can use the code below to insert sample data into the created Delta table:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit
    from pyspark.sql.window import Window
    import pyspark.sql.functions as F
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType
    
    # Create a Spark session
    spark = SparkSession.builder.appName("InsertIntoDeltaTable").getOrCreate()
    
    # Create a DataFrame with the values you want to insert (excluding the "Id" column)
    data = [("AA",), ("BB",), ("CC",)]
    columns = ["Name"]
    insert_df = spark.createDataFrame(data, columns)
    
    # Load the existing Delta table
    delta_table_path = "<deltaTablePath>"
    existing_df = spark.read.format("delta").load(delta_table_path)
    
    # Add an identity column starting from the next available value
    window_spec = Window.orderBy(F.monotonically_increasing_id())
    max_id = existing_df.select(F.max("Id")).collect()[0][0] or 0
    insert_df = insert_df.withColumn("Id", lit(max_id + F.row_number().over(window_spec)))
    
    # Write the DataFrame to the Delta table
    insert_df.write.format("delta").mode("append").save(delta_table_path)
    

    This will insert the data into the Delta table, as shown below:

    Inserted data

    Inserted data details