python-3.xdatabricksdelta-live-tables

Custom logging in Databricks delta live tables, dlt


I am using dlt python in one of our ETL pipelines, and has kafka topics to be processed using delta live tables. Since when running the DLT pipelines, we couldnt print any log / status messages, I tried using custom loggin using logging library. Though its not throwing any errors, but we couldnt see the logs? Would appreciate any pointers to implement custom logging in DLT pipelines please. I tried the following, but no logs are displayed in the pipeline console.

import json
import dlt
import time
from pyspark.sql.functions import from_json, to_json, col, lit, coalesce
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import date_format
import logging
from datetime import datetime

logger = logging.getLogger("raw_zone")
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger.info('Processing of landing to Raw layer has started {0}'.format(datetime.now))

Am aware of audit logs where we can see logs, but would like to know if its possible to view in the pipeline console where there is a section to display system logs.

My pipeline is running for long time for couple of days, and i would like to investigate or know which cell is causing the time delay. Without custom logging its very difficult to troubleshoot the DLT pipelines.


Solution

  • Ok, I would like to share the solution i built myself ot have a custom log stored in an ADLS location, so it would be useful reference to start with. I created a seperate notebook, say utility notebook wherein i had configured all log related info such as local path, mount path and ADLS path, log configuration, etc.

    @Utils notebook:
    ----------------
    from datetime import datetime
    from pyspark.sql.functions import to_date, upper, lit
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    import functools 
    import logging
    from datetime import datetime
    
    currdate = datetime.now().strftime('%Y%m%d_%H%M%S')
    file_log = f'my_pipeline_{currdate}.log'
    log_partitions = datetime.now().strftime('%Y-%m-%d')
    mnt_path='my-pipeline-logs'
    
    # vars
    local_log_folder = "/tmp"
    dbutils.fs.mkdirs(f"{local_log_folder}")
    
    local_log_file_path = f"file:{local_log_folder}/{file_log}"
    mnt_log_file_path = f"dbfs:/mnt/{mnt_path}/{log_partitions}/{file_log}"
    
    class Utils:
        # setup logging
        for handler in logging.root.handlers[:]:
          logging.root.removeHandler(handler)
    
        logging.basicConfig(
          filename=f"{local_log_folder}/{file_log}",
          filemode='a',
          format='%(asctime)s | %(levelname)s | %(message)s',
          datefmt='%Y-%m-%d %H:%M:%S UTC (%z)',
          level=logging.INFO
        )
    
        def writelog(notebook_name, message):
          logging.info(f"{notebook_name.upper()}: {message}")
          mnt_log_file_path = f"dbfs:/mnt/{mnt_path}/{log_partitions}/{notebook_name[2:].upper()}_{file_log}"
          dbutils.fs.cp(local_log_file_path, mnt_log_file_path)
    

    I have created seperate Util notebook, since i want this to be used across notebooks, but log should be partitioned, and named with respective notebook name. And thats why am passing the notebook name to the writelog() function.

    and in the calling function, we can use the magic command to call this Util notebook, like %run ../utilities/utils.

    To log a message, or write to our custom logs, use the writeLog(), i.e.,

    Utils.writelog(notebook_name, "Helloworld !!!") 
    

    To get the notebook name, you can use the following code.

    notebook_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get().split('/')[-1]
    

    If everything goes well we should see the log created in the ADLS location as a sample below: enter image description here