I'm new to ASA and I am trying to create a blank delta table with an identity column that auto increments by 1. Is there anyway to do this without using dedicated SQL?
Tried using TSQL syntax but it seems like there are some limitations.
You can create an empty Delta table with an identity column using the code below in a Synapse notebook:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# Create a Spark session
spark = SparkSession.builder.appName("BlankDeltaTableWithIdentity").getOrCreate()
schema = StructType([
StructField("Id", IntegerType(), False),
StructField("Name", StringType(), True) # Set nullable to True for other columns
])
df = spark.createDataFrame([], schema)
# Add an identity column starting from 1 and incrementing by 1
df = df.withColumn("Id", lit(0)) # Initialize the Id column with zeros
# Create a Window specification for row_number()
window_spec = Window.orderBy(F.monotonically_increasing_id())
# Use row_number() to generate sequential numbers starting from 1
df = df.withColumn("Id", F.row_number().over(window_spec))
# Write the DataFrame as a Delta table
df.write.format("delta").mode("overwrite").save("<deltaTablePath>")
This will create a Delta table successfully in the required path, as shown below:
You can use the code below to insert sample data into the created Delta table:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Create a Spark session
spark = SparkSession.builder.appName("InsertIntoDeltaTable").getOrCreate()
# Create a DataFrame with the values you want to insert (excluding the "Id" column)
data = [("AA",), ("BB",), ("CC",)]
columns = ["Name"]
insert_df = spark.createDataFrame(data, columns)
# Load the existing Delta table
delta_table_path = "<deltaTablePath>"
existing_df = spark.read.format("delta").load(delta_table_path)
# Add an identity column starting from the next available value
window_spec = Window.orderBy(F.monotonically_increasing_id())
max_id = existing_df.select(F.max("Id")).collect()[0][0] or 0
insert_df = insert_df.withColumn("Id", lit(max_id + F.row_number().over(window_spec)))
# Write the DataFrame to the Delta table
insert_df.write.format("delta").mode("append").save(delta_table_path)
This will insert the data into the Delta table, as shown below: