I have created a DLT metadata driven notebook which reads data from configuration tables and processes the data from ADLS Gen2 to DLT tables. I am able to parameterize all other options like target, source, sequence_by, etc. But I am getting issues while parameterizing keys option. while executing the logic if the parameter value is passed as 'pkey' for referring, spark splits all characters and tries to pass each character as column name by which I receive '[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name [
cannot be resolved. Did you mean one of the following? [Name
, pkey
, Value
, dq_check
, Description
]. SQLSTATE: 42703;
'Project ['[, '', 'p, 'k, 'e, 'y, '', ']]'.
---SQL script for config tables
DROP TABLE IF EXISTS XyzBank.MetaData.SourceMetaData;
CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceMetaData
(
SourceMetaDataId BIGINT GENERATED ALWAYS AS IDENTITY
,SourceFileName STRING
,SourceFilePath STRING
,SourceFileFormat STRING
,SourceActive STRING
,SourceDelimeter STRING
,SourceHeader STRING
,SourcePartitionColumn STRING
,SourceDataQuality STRING
,SourceTransformation STRING
,SourceTransformFunc STRING
,SourceFileOptions STRING
,SourceTableName STRING
,SourceSchemaName STRING
,ScdExceptColumnList STRING
,ScdType INT
,SequenceBy STRING
,Keys STRING
,SourceGroupId INT
,CreatedOn TIMESTAMP
,CreatedBy STRING
,ModifiedOn TIMESTAMP
,ModifiedBy STRING
)
INSERT INTO XyzBank.MetaData.SourceMetaData (SourceFilePath,SourceFileFormat,SourceActive,SourceDelimeter,SourceHeader,SourceDataQuality,SourceFileOptions,SourceTableName,ScdType,SequenceBy,Keys,SourceGroupId,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
SELECT 'abfss://abc@xyz.dfs.core.windows.net/DLT/', 'csv', 'True', ',', 'True', '{"validate Description":"(Description is NOT NULL)","validate Name":"(Name is NOT NULL)"}', '{"cloudFiles.format":"csv","header":True}', 'Product', 2,'file_process_date','["pkey"]',1, current_timestamp(), 'ABC', current_timestamp(), 'ABC'
DROP TABLE IF EXISTS XyzBank.MetaData.SourceSchemaConfig;
CREATE TABLE IF NOT EXISTS XyzBank.MetaData.SourceSchemaConfig
(
SourceSchemaConfigId BIGINT GENERATED ALWAYS AS IDENTITY
,SourceMetaDataId INT
,ColumnName STRING
,ColumnDataType STRING
,ColumnOrder INT
,IsNullable STRING
,IsSensitive STRING
,CreatedOn TIMESTAMP
,CreatedBy STRING
,ModifiedOn TIMESTAMP
,ModifiedBy STRING
)
INSERT INTO XyzBank.MetaData.SourceSchemaConfig (SourceMetaDataId,ColumnName,ColumnDataType,ColumnOrder,IsNullable,IsSensitive,CreatedOn,CreatedBy,ModifiedOn,ModifiedBy)
SELECT 1, 'pkey', 'IntegerType', 1, 'False', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
SELECT 1, 'Description', 'StringType', 2, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
SELECT 1, 'Name', 'StringType', 3, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC' UNION
SELECT 1, 'Value', 'StringType', 4, 'True', 'No', current_timestamp(), 'ABC', current_timestamp(), 'ABC'
------end of SQL script
#pyspark script
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql import Row
import json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType, DateType, DoubleType
df = spark.sql("SELECT * FROM xyzbank.metadata.sourcemetadata WHERE SourceGroupId = 1")
for row in df.collect():
#print(row['SourceMetaDataId'])
schema_query = f"SELECT * FROM xyzbank.metadata.sourceschemaconfig WHERE SourceMetaDataId = {row['SourceMetaDataId']}"
#print(schema_query)
df_schema = spark.sql(schema_query)
#display(df_schema)
data_type_mapping = {
"StringType": StringType(),
"IntegerType": IntegerType(),
"TimeType": TimestampType(),
"Datetime": DateType(),
"DoubleType": DoubleType(),
"DateType": DateType()
}
# Collect distint values of "ColumnDataType" and "ColumnName" and "ColunOrder"
distinct_datatypes = (
df_schema.select("ColumnDataType", "ColumnName", "ColumnOrder").distinct().collect()
)
# Sort distinct_datatypes based on "ColumnOrder"
distinct_datatypes = sorted(distinct_datatypes, key=lambda x: x.ColumnOrder)
# Create schema fields
schema_fields = [
StructField(row.ColumnName, data_type_mapping[row.ColumnDataType], True)
for row in distinct_datatypes
]
# Create and return the schema
schema = StructType(schema_fields)
display(row)
#dlt_ingestion_metdata_function(row=row, schema=schema)
table_name = row['SourceTableName']
checks = row['SourceDataQuality']
checks = json.loads(checks)
keys = row['Keys']
display(keys)
#keys = ["pkey"]
print(keys)
sequence_by = row['SequenceBy']
display(sequence_by)
file_path = row['SourceFilePath']
cloud_file_options = eval(row['SourceFileOptions'])
dq_rules = "({0})".format("AND".join(checks.values()))
@dlt.table(
name = "brz_load_"+table_name
)
def bronze_load():
df3 = spark.readStream.format("cloudFiles").options(**cloud_file_options).schema(schema).load(file_path)
df3 = df3.withColumn("file_process_date", F.current_timestamp())
return df3
@dlt.table(
name = "stag_silver_load_"+table_name
)
@dlt.expect_all(checks)
def stag_silver_table():
df3 = dlt.readStream("brz_load_"+table_name)
df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=true")
return df3
dlt.create_streaming_table(
name = "silver_load_"+table_name
)
dlt.apply_changes(
target = "silver_load_"+table_name,
source = "stag_silver_load_"+table_name,
keys=keys,
stored_as_scd_type=2,
sequence_by=sequence_by
)
@dlt.table(
name = "quarantine_silver_load_"+table_name
)
@dlt.expect_all(checks)
def quarantine_silver_table():
df3 = dlt.readStream("brz_load_"+table_name)
df3 = df3.withColumn("dq_check", F.expr(dq_rules)).filter("dq_check=false")
return df3
#end of pyspark script
The keys you are inserting in the XyzBank.MetaData.SourceMetaData
is of string type '["pkey"]'
, so convert it to array of strings and pass to apply_changes
.
Use below code to convert it.
import json
keys = json.loads(row['Keys'])
If the input is always kind of this string '["pkey"]'
you can use above code to get array of strings, else if the input strings changing according to that you convert it so that the apply_changes
get the keys as array of strings containing key column.