I am trying to set up a simple csv ingestion to a delta table (this is for the initial load).
the first 2 step ran successfully
I have it split into 3 notebooks:
Mount the Blob storage
#mount the BLOB storage in databricks
dbutils.fs.mount(
source="wasbs://CONTAINORXX@databrickstoragetest.blob.core.windows.net/",
mount_point="/mnt/MOUNTXX",
extra_configs={"fs.azure.account.key.databrickstoragetest.blob.core.windows.net": "KEYXX"}
)
Create the schema and delta table
#create delta table
from delta.tables import DeltaTable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
# create schema
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("country", StringType(), True)
])
#create the new table
empty_df = spark.createDataFrame([], schema)
empty_df.write.format("delta").saveAsTable("persontest")
Load the csv and archive it
# Define file paths
input_path = "/mnt/MOUNTXX/*.csv"
archive_path = "/mnt/MOUNTXX/archive"
#read csv
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(input_path)
#load data into table
df.write.format("delta").mode("append").saveAsTable("persontest")
#archive file
processed_files = dbutils.fs.ls(input_path)
for file in processed_files:
if file.path.endswith(".csv"):
dbutils.fs.mv(file.path, archive_path + "/" + file.name)
I am getting the following error when the third step runs:
[DELTA_INVALID_FORMAT] Incompatible format detected.
A transaction log for Delta was found at `/_delta_log`,
but you are trying to read from `/mnt/MOUNTXX/*.csv` using format("csv").
You must use 'format("delta")' when reading and writing to a delta table.
SQLSTATE: 22000
File <command-2260341893915879>, line 6
3 archive_path = "/mnt/MOUNTXX/archive"
5 #read csv ---->
6 df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(input_path)
8 #load data into table
9 df.write.format("delta").mode("append").saveAsTable("persontest")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try: --->
47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:312, in DataFrameReader.load(self, path, format, schema, **options)
310 self.options(**options)
311 if isinstance(path, str): -->
312 return self._df(self._jreader.load(path))
313 elif path is not None:
314 if type(path) != list:
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
1349 command = proto.CALL_COMMAND_NAME +\
1350 self.command_header +\
1351 args_command +\
1352 proto.END_COMMAND_PART
1354 answer = self.gateway_client.send_command(command) ->
1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
1359 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:261, in capture_sql_exception.<locals>.deco(*a, **kw)
257 converted = convert_exception(e.java_exception)
258 if not isinstance(converted, UnknownException):
259 # Hide where the exception came from that shows a non-Pythonic
260 # JVM exception message. -->
261 raise converted from None
262 else:
263 raise
The input path
input_path = "/mnt/MOUNTXX/*.csv"
is overlapping with the delta log file
`/_delta_log`
so, you are getting this error.
You either move delta log file or csv files to another directory.
Below code moves all csv file to input
folder and do read operation.
source_path = "/mnt/MOUNTXX/"
input_path = "/mnt/MOUNTXX/input/"
dbutils.fs.mkdirs(input_path)
files_to_move = dbutils.fs.ls(source_path)
for file in files_to_move:
if file.path.endswith(".csv"):
dbutils.fs.mv(file.path, f"{input_path}{file.name}")
After moving them you use your code.
#read csv
df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(input_path)
#load data into table
df.write.format("delta").mode("append").saveAsTable("persontest")
#archive file
processed_files = dbutils.fs.ls(input_path)
for file in processed_files:
if file.path.endswith(".csv"):
dbutils.fs.mv(file.path, archive_path + "/" + file.name)