pythonpysparkjupyter-notebook

Fabric PySpark Notebook Parsing JSON string with double quote escape chars


I am working in a PySpark notebook that gets its input parameter as a JSON String from a Pipeline and the notebook need to process the string further. Below is the example string that the notebook gets as input and example code that try to process the string and fails because the Details section has text enclosed with " ("Internal Server Error") and I am not sure how do I parse these unescaped " chars inside the message.

I am able to parse \n \t and \r but stuck with \". I can't even directly do replace on the entire file as the element names has to be enclosed with ".

Please suggest how can I parse these types of messages that are having text enclosed with ".

Here is the example JSON string that the notebook received and plan to perse.

#Input String Received from Pipeline
var_str_log_data = "[{\"ApplicationName\": \"APP\",\"WorkspaceId\": \"0addb382-fa0d-4ce1-9c3c-95e32b957955\",\"Environment\": \"DEV\",\"Level\": \"ERROR\",\"Severity\": \"SEV 4\",\"Component\": \"PL_APP_Ingest\",\"Operation\": \"ETL\",\"Run_Id\": \"88276e4f-4e60-47ec-a6d6-dd9e475d2c3a\",\"SessionId\": \"\",\"Message\": \"[DEV] - APP - Pipeline Execution Failed - 20250329\",\"Status\": \"Success\",\"Details\": \"Pipeline Name: PL_APP_Ingest\\nRun Id: 88257e4f-4e60-34ec-a6d6-dd9e475d2c3a\\nError Message: Notebook execution failed at Notebook service with http status code - '200', please check the Run logs on Notebook, additional details - 'Error name - Py4JJavaError, Error value - An error occurred while calling o4628.load.\n: java.util.concurrent.ExecutionException: Operation failed: \"Internal Server Error\", 500, HEAD, http://msit-onelake.dfs.fabric.microsoft.com/MYWORKSPACE/dataverse_name_cds2_workspace_unq60feaf930cbf236eb2519b27f2d6b.Lakehouse/Tables/app_riskparcel/_delta_log?upn=false&action=getStatus&timeout=90\n  at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)\n    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)\n at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)\n  at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)\n  ... 37 more\n' : \\nExecution URL: https://msit.powerbi.com/workloads/data-pipeline/artifacts/workspaces/0addb122-fa0d-4ce1-9c3c-95e25b957955/pipelines/PL_APP_Ingest/88223e4f-4e60-47ec-a6d6-dd9e475d2c3a?experience=power-bi\\nApp Name: APP\",\"CorrelationId\": \"\",\"User\": \"System\"}]"

#Example Code Try to Process the JSON provided a String

var_str_log_data = re.sub(r'(?<!\\)\n', r'\\n', var_str_log_data)
var_str_log_data = re.sub(r'(?<!\\)\t', r'\\t', var_str_log_data)
var_str_log_data = re.sub(r'(?<!\\)\r', r'\\r', var_str_log_data)

print(f"Log Data Format and Content: {type(var_str_log_data)} - {var_str_log_data}.")

try:
    data = json.loads(var_str_log_data)
    print("JSON loaded successfully")
except json.JSONDecodeError as e:
    error_pos = e.pos
    print("Invalid JSON:", e)
    print(f"Character at position {error_pos}: {repr(var_str_log_data[error_pos-1])}")
    start = max(0, error_pos - 20)
    end = min(len(var_str_log_data), error_pos + 20)
    print("Surrounding text:", repr(var_str_log_data[start:end]))

Thanks, Prabhat


Solution

  • It seems the only problem is with " and \n (eventually \r, \t) inside Details

    Rest is OK. String like \"ApplicationName\" is correct because it is inside " "

    So I use Details": " and "," (as end of details) to find substring which makes problem.
    I hope Details will never have "," inside string.

    ( I uses variable text only because it was simpler to write it again and again in code :) )

    text = var_str_log_data
    
    start = text.find('Details": "') + len('Details": "')
    end   = text.find('","', start)
    
    #print(f'start: {start:5} |', text[start:start+5])
    #print(f'end  : {  end:5} |', text[end-5:end])
    
    middle = text[start:end]
    

    and later I change only in middle.

    I found that using .replace() is simpler for me than encode()/decode() and regex

    middle = middle.replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
    

    And later I replace middle in original string

    text = text[:start] + middle + text[end:]
    

    Full working code which I used for tests.

    import json
    
    var_str_log_data = "[{\"ApplicationName\": \"APP\",\"WorkspaceId\": \"0addb382-fa0d-4ce1-9c3c-95e32b957955\",\"Environment\": \"DEV\",\"Level\": \"ERROR\",\"Severity\": \"SEV 4\",\"Component\": \"PL_APP_Ingest\",\"Operation\": \"ETL\",\"Run_Id\": \"88276e4f-4e60-47ec-a6d6-dd9e475d2c3a\",\"SessionId\": \"\",\"Message\": \"[DEV] - APP - Pipeline Execution Failed - 20250329\",\"Status\": \"Success\",\"Details\": \"Pipeline Name: PL_APP_Ingest\\nRun Id: 88257e4f-4e60-34ec-a6d6-dd9e475d2c3a\\nError Message: Notebook execution failed at Notebook service with http status code - '200', please check the Run logs on Notebook, additional details - 'Error name - Py4JJavaError, Error value - An error occurred while calling o4628.load.\n: java.util.concurrent.ExecutionException: Operation failed: \"Internal Server Error\", 500, HEAD, http://msit-onelake.dfs.fabric.microsoft.com/MYWORKSPACE/dataverse_name_cds2_workspace_unq60feaf930cbf236eb2519b27f2d6b.Lakehouse/Tables/app_riskparcel/_delta_log?upn=false&action=getStatus&timeout=90\n  at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)\n    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)\n at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)\n  at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)\n  ... 37 more\n' : \\nExecution URL: https://msit.powerbi.com/workloads/data-pipeline/artifacts/workspaces/0addb122-fa0d-4ce1-9c3c-95e25b957955/pipelines/PL_APP_Ingest/88223e4f-4e60-47ec-a6d6-dd9e475d2c3a?experience=power-bi\\nApp Name: APP\",\"CorrelationId\": \"\",\"User\": \"System\"}]"
    
    text = var_str_log_data
    
    start = text.find('Details": "') + len('Details": "')
    end   = text.find('","', start)
    
    print(f'start: {start:5} |', text[start:start+5])
    print(f'end  : {  end:5} |', text[end-5:end])
    middle = text[start:end]
    print('--- middle ---')
    print(middle)
    
    middle = middle.replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
    print('--- middle ---')
    print(middle)
    
    text = text[:start] + middle + text[end:]
    #print(text)
    
    data = json.loads(text)
    
    print('--- ApplicationName ---')
    print(data[0]['ApplicationName'])
    
    print('--- Details ---')
    print(data[0]['Details'])
    

    EDIT:

    For example from comment I had to change code - move .replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t') to text and add .replace("'", '"').replace('", "', '","') to replace ' with " and remove space in ", " to correctly find end of Details.

    And it works for both examples.

    import json
    
    # var_str_log_data = "[{\"ApplicationName\": \"APP\",\"WorkspaceId\": \"0addb382-fa0d-4ce1-9c3c-95e32b957955\",\"Environment\": \"DEV\",\"Level\": \"ERROR\",\"Severity\": \"SEV 4\",\"Component\": \"PL_APP_Ingest\",\"Operation\": \"ETL\",\"Run_Id\": \"88276e4f-4e60-47ec-a6d6-dd9e475d2c3a\",\"SessionId\": \"\",\"Message\": \"[DEV] - APP - Pipeline Execution Failed - 20250329\",\"Status\": \"Success\",\"Details\": \"Pipeline Name: PL_APP_Ingest\\nRun Id: 88257e4f-4e60-34ec-a6d6-dd9e475d2c3a\\nError Message: Notebook execution failed at Notebook service with http status code - '200', please check the Run logs on Notebook, additional details - 'Error name - Py4JJavaError, Error value - An error occurred while calling o4628.load.\n: java.util.concurrent.ExecutionException: Operation failed: \"Internal Server Error\", 500, HEAD, http://msit-onelake.dfs.fabric.microsoft.com/MYWORKSPACE/dataverse_name_cds2_workspace_unq60feaf930cbf236eb2519b27f2d6b.Lakehouse/Tables/app_riskparcel/_delta_log?upn=false&action=getStatus&timeout=90\n  at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)\n    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)\n at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)\n  at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)\n  ... 37 more\n' : \\nExecution URL: https://msit.powerbi.com/workloads/data-pipeline/artifacts/workspaces/0addb122-fa0d-4ce1-9c3c-95e25b957955/pipelines/PL_APP_Ingest/88223e4f-4e60-47ec-a6d6-dd9e475d2c3a?experience=power-bi\\nApp Name: APP\",\"CorrelationId\": \"\",\"User\": \"System\"}]"
    
    var_str_log_data = "[{'ApplicationName': 'APP', 'WorkspaceId': 'afc5f4c7-7e26-4654-803c-d82f74108b79', 'Environment': 'DEV', 'Level': 'ERROR', 'Severity': 'SEV 4', 'Component': 'PL_Demo_Pipeline', 'Operation': 'ETL', 'Run_Id': 'f3d4ea03-ecdc-4003-8229-6ed7a88721a9', 'SessionId': '', 'Message': '[DEV] - APP - Pipeline Execution Failed - 20250331 - 182428 (Ignore) \nStatus: Success.', 'Status': 'Success', 'Details': 'Log generated from App while manually triggering. Ignore this entry. \nThis is new line message added to the log with a TAB \tinserted here.', 'CorrelationId': '', 'User': 'data_engineer_1'}]"
    
    text = var_str_log_data.replace("'", '"').replace('", "', '","').replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')
    
    print(text)
    
    start = text.find('Details": "') + len('Details": "')
    end   = text.find('","', start)
    
    print(f'start: {start:5} |', text[start:start+5])
    print(f'end  : {  end:5} |', text[end-5:end])
    middle = text[start:end]
    print('--- middle ---')
    print(middle)
    
    middle = middle.replace('"', '\\"')
    print('--- middle ---')
    print(middle)
    
    text = text[:start] + middle + text[end:]
    print('--- text ---')
    print(text)
    
    data = json.loads(text)
    
    print('--- ApplicationName ---')
    print(data[0]['ApplicationName'])
    
    print('--- Details ---')
    print(data[0]['Details'])