I am using dlt python in one of our ETL pipelines, and has kafka topics to be processed using delta live tables. Since when running the DLT pipelines, we couldnt print any log / status messages, I tried using custom loggin using logging library. Though its not throwing any errors, but we couldnt see the logs? Would appreciate any pointers to implement custom logging in DLT pipelines please. I tried the following, but no logs are displayed in the pipeline console.
import json
import dlt
import time
from pyspark.sql.functions import from_json, to_json, col, lit, coalesce
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import date_format
import logging
from datetime import datetime
logger = logging.getLogger("raw_zone")
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.info('Processing of landing to Raw layer has started {0}'.format(datetime.now))
Am aware of audit logs where we can see logs, but would like to know if its possible to view in the pipeline console where there is a section to display system logs.
My pipeline is running for long time for couple of days, and i would like to investigate or know which cell is causing the time delay. Without custom logging its very difficult to troubleshoot the DLT pipelines.
Ok, I would like to share the solution i built myself ot have a custom log stored in an ADLS location, so it would be useful reference to start with. I created a seperate notebook, say utility notebook wherein i had configured all log related info such as local path, mount path and ADLS path, log configuration, etc.
@Utils notebook:
----------------
from datetime import datetime
from pyspark.sql.functions import to_date, upper, lit
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import functools
import logging
from datetime import datetime
currdate = datetime.now().strftime('%Y%m%d_%H%M%S')
file_log = f'my_pipeline_{currdate}.log'
log_partitions = datetime.now().strftime('%Y-%m-%d')
mnt_path='my-pipeline-logs'
# vars
local_log_folder = "/tmp"
dbutils.fs.mkdirs(f"{local_log_folder}")
local_log_file_path = f"file:{local_log_folder}/{file_log}"
mnt_log_file_path = f"dbfs:/mnt/{mnt_path}/{log_partitions}/{file_log}"
class Utils:
# setup logging
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(
filename=f"{local_log_folder}/{file_log}",
filemode='a',
format='%(asctime)s | %(levelname)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S UTC (%z)',
level=logging.INFO
)
def writelog(notebook_name, message):
logging.info(f"{notebook_name.upper()}: {message}")
mnt_log_file_path = f"dbfs:/mnt/{mnt_path}/{log_partitions}/{notebook_name[2:].upper()}_{file_log}"
dbutils.fs.cp(local_log_file_path, mnt_log_file_path)
I have created seperate Util notebook, since i want this to be used across notebooks, but log should be partitioned, and named with respective notebook name. And thats why am passing the notebook name to the writelog() function.
and in the calling function, we can use the magic command to call this Util notebook, like %run ../utilities/utils
.
To log a message, or write to our custom logs, use the writeLog(), i.e.,
Utils.writelog(notebook_name, "Helloworld !!!")
To get the notebook name, you can use the following code.
notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split('/')[-1]
If everything goes well we should see the log created in the ADLS location as a sample below: