pythonkqlazure-data-explorer

azure-kusto-data insert duplicated and corrupted data when some pattern '2_' in it


Using azure-kusto-data@5.0.1 when running an ingest query using both URI or Ingest URI, it ingests a column with value null as soon as the previous field contains the characters 2_ in it.

I can run the query in the azure data explorer and it works fine, but using this method it ingests null in the “value” field.

azure data explorer table schema:

{
 “Name”: “kpi_table”, 
 “OrderedColumns”: 
    [{
        “Name”: “server_timestamp”, 
        “Type”: “System.DateTime”, 
        “CslType”: “datetime”
    },
    {
        “Name”: “name”, 
        “Type”: “System.String”, 
        “CslType”: “string”
    },
    {
        “Name”: “plant_id”, 
        “Type”: “System. String”, 
        “CslType”: “string”
    },
    {
        “Name”: “device_key”, 
        “Type”: “System.String”, 
        “CslType”: “string”
    },
    {
        “Name”: “value”, 
        “Type”: “System.Double”, 
        “CslType”: “real”
    }]
}

Data to ingest:
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, 0.0201

Data ingested:
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, 0.0201
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, null

Performs a successful ingest and 1 second later according to ingestion_time() does another ingest with value to null.

If in the field corresponding to device_key, you replace the character 2 by 3, it ingests correctly, can it detect the pattern 2_ as special characters internally?

Code:

import pandas as pd
from azure.kusto.data import KustoClient

def __init__(self, url, database, client_id, client_secret, tenant_id):
        
        kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
            url, client_id, client_secret, tenant_id
        )
        self.conn = KustoClient(kcsb)
        
def execute_kql_query(self, query: str):
    try:
        logger.debug("Executing KQL query: %s", query)
        response = self.conn.execute(self.database, query)
        if response.get_exceptions():
            raise Exception(response.get_exceptions())
        return dataframe_from_result_table(response.primary_results[0])
    except Exception as error:
        logger.error("Error executing KQL query: %s", error)
        return pd.DataFrame()

The query is a string with an .inline ingest, and although I know it is not recommended for production environments, the error is also reproduced with streaming ingest.


Solution

  • ingests a column with value null as soon as the previous field contains the characters 2_ in it.

    Duplicate records were added, which caused the issue of null values occurring when '2_' appeared in the device key. This only occurred when using the azure-kusto-data library, not in the Azure UI.

    This code verifies whether the '2_' pattern in the device_key creates any ingestion issues, takes new data while avoiding duplicates, and looks for existing data in the Azure Data Explorer database.

    import os
    import pandas as pd
    import logging
    import tempfile
    from azure.kusto.data import KustoConnectionStringBuilder, KustoClient
    from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor
    from azure.kusto.ingest.ingestion_properties import DataFormat
    from azure.kusto.data.helpers import dataframe_from_result_table
    logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
    logger = logging.getLogger(__name__)
    CLUSTER_URL = "<Your URL>"
    DATABASE = "<Your database Name>"
    TABLE_NAME = "<Your table Name>"
    CLIENT_ID = "<Your clientID>"
    CLIENT_SECRET = "<Your clientSecret>"
    TENANT_ID = "<Your tenantID>"
    if not CLIENT_ID or not CLIENT_SECRET or not TENANT_ID:
        logger.error("Missing Azure credentials.")
        exit(1)
    try:
        kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
            CLUSTER_URL, CLIENT_ID, CLIENT_SECRET, TENANT_ID
        )
        kusto_client = KustoClient(kcsb)
        ingest_client = QueuedIngestClient(kcsb)
        logger.info("Connected to Azure Data Explorer.")
    except Exception as e:
        logger.error(f"Failed to connect to Kusto: {e}")
        exit(1)
    create_table_query = f"""
    .create table {TABLE_NAME} (
        server_timestamp: datetime,
        name: string,
        plant_id: string,
        device_key: string,
        value: real
    )
    """
    try:
        kusto_client.execute_mgmt(DATABASE, create_table_query) 
        logger.info("Table check completed (created if not exists).")
    except Exception as e:
        logger.error(f"Error creating table: {e}")
        exit(1)
    data = [
        ("2025-03-13T22:06:00Z", "kpi_inverter_production", "#ESSSAB2", "#ESSSAB2_T010JPK01KKP03", 0.0201),
        ("2025-03-13T22:07:00Z", "kpi_inverter_production", "#ESSSAB3", "#ESSSAB3_T010JPK01KKP03", 0.0301),
        ("2025-03-13T22:07:00Z", "kpi_inverter_production", "#ESSSAB3", "#ESSSAB3_T010JPK01KKP03", 0.0401),
    ]
    df = pd.DataFrame(data, columns=["server_timestamp", "name", "plant_id", "device_key", "value"])
    df["server_timestamp"] = pd.to_datetime(df["server_timestamp"], utc=True)  # Ensure correct format
    logger.info("Checking for existing data...")
    existing_data_query = f"""
    {TABLE_NAME}
    | where server_timestamp in ({', '.join(['datetime("' + ts.strftime('%Y-%m-%dT%H:%M:%S') + '")' for ts in df["server_timestamp"]])})
    | project server_timestamp, name, plant_id, tostring(device_key), value
    """
    try:
        response = kusto_client.execute(DATABASE, existing_data_query)
        df_existing = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
    except Exception as e:
        logger.error(f"Error querying existing data: {e}")
        df_existing = pd.DataFrame()
    
    if not df_existing.empty:
        df_existing["server_timestamp"] = pd.to_datetime(df_existing["server_timestamp"], utc=True)
    df_to_ingest = df.copy()
    if not df_existing.empty:
        df_to_ingest = df.merge(df_existing, on=["server_timestamp", "name", "plant_id", "device_key", "value"], how="left", indicator=True)
        df_to_ingest = df_to_ingest[df_to_ingest["_merge"] == "left_only"].drop(columns=["_merge"])
    if df_to_ingest.empty:
        logger.info("All data already exists. Skipping ingestion.")
    else:
        logger.info(f"{len(df_to_ingest)} new rows to ingest.")
        with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp_file:
            df_to_ingest.to_csv(tmp_file.name, index=False, header=False, encoding="utf-8", sep=",")
            tmp_filename = tmp_file.name
        ingestion_props = IngestionProperties(
            database=DATABASE,
            table=TABLE_NAME,
            data_format=DataFormat.CSV,
            flush_immediately=True
        )
        try:
            file_desc = FileDescriptor(tmp_filename)
            ingest_client.ingest_from_file(file_desc, ingestion_props)
            logger.info("Data ingestion successful!")
        except Exception as e:
            logger.error(f"Error during ingestion: {e}")
    
        os.remove(tmp_filename)
    logger.info("Validating for ingestion issues related to '2_' pattern...")
    
    validation_query = f"""
    {TABLE_NAME}
    | where device_key contains "2_" and isnull(value)
    | project server_timestamp, name, plant_id, device_key, value
    """
    try:
        response = kusto_client.execute(DATABASE, validation_query)
        df_validation = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
        if not df_validation.empty:
            logger.warning("Warning: Found ingested rows with null values in 'value' field and device_key containing '2_'.")
            logger.warning(df_validation)
        else:
            logger.info("No null values detected in 'value' field for device_key containing '2_'.")
    except Exception as e:
        logger.error(f"Error running validation query: {e}")
    query = f"{TABLE_NAME} | take 10"
    try:
        response = kusto_client.execute(DATABASE, query)
        df_result = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
    
        logger.info("\nIngested Data:")
        logger.info(df_result)
    except Exception as e:
        logger.error(f"Error fetching ingested data: {e}")
    

    Resolved it by verifying that the timestamps were accurate, determining whether the data already existed before adding new rows, and performing tests after the data was ingested to identify any problems.

    Outputs:

    enter image description here

    enter image description here

    The 'value' field appears to have no null values where the 'device_key' contains '2_'.

    enter image description here