azureazure-functionsazure-data-lake

Sent data to datalake storage


I'm trying to save a json that i've sent from an ESP32 to an IoTHub, the json file looks like this : `

' { "msgCount": 0, "message": "hello world", "id": "test1", "SoundCategory": "rue", "SoundName": "pelleteuse", "dB": "105 dB", "Duration": "40 min", "StartTime": "04-06-2024:17h24", "EndTime": "04-06-2024:18h04", "Location": "[100.00,0.00]" }'

I've done a python azure function that is triggered each time the telemetry is received that until now worked

import logging
import azure.functions as func
import base64


app = func.FunctionApp()

@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        logging.info(payload_bytes)
    else:
        logging.warning('No payload found in the event.')

the thing is, now I want to send it to my datalake storage but whenever I try to deploy on azure my function with some new import that I'm trying to connect with the datalake storage, the function disappear of the function App even tough the deployment was successful. I don't know if I'm doing it the right way and why whenever I try one of these it doesn't work.

Here's the two modules from which the issues seems to come from

from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential

When I do just the importation of modules my code already disappears from the function app

import logging
import azure.functions as func
import base64
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential

app = func.FunctionApp()


@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        logging.info(payload_bytes)
    else:
        logging.warning('No payload found in the event.')

and here are the other different codes that I've tried without success

import logging
import azure.functions as func
import json 
import base64
import os
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential
from datetime import datetime

app = func.FunctionApp()

storage_account_name = "mystorageaccountname"
directory_name = datetime.now().strftime('%Y%m%d')

@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        
        # Initialize Azure Data Lake Storage client
        storage_client = initialize_storage_account()
        
        # # Create file system dynamically
        file_system_client = create_file_system(storage_client)
        
        # # Create directory within the file system
        directory_client = create_directory(file_system_client, directory_name)
        
        # # Upload payload bytes to the created directory
        upload_file_to_directory(directory_client, local_path='', file_name='payload_file.bin', data=payload_bytes)

    else:
        logging.warning('No payload found in the event.')

def initialize_storage_account():
    try:
        credential = DefaultAzureCredential()
        service_client = DataLakeServiceClient(account_url=f"https://{storage_account_name}.dfs.core.windows.net", credential=credential)
        return service_client
    except Exception as e:
        logging.error(f"Error initializing storage account: {e}")

def create_file_system(service_client: DataLakeServiceClient) -> FileSystemClient:
    try:
        file_system_name = datetime.now().strftime('%Y%m%d%H%M%S')  # Generate a unique file system name
        file_system_client = service_client.create_file_system(file_system=file_system_name)
        logging.info(f"File system '{file_system_name}' created successfully.")
        return file_system_client
    except Exception as e:
        logging.error(f"Error creating file system: {e}")

def create_directory(self, file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
    directory_client = file_system_client.create_directory(directory_name)

    return directory_client

def upload_file_to_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="rb") as data:
        file_client.upload_data(data, overwrite=True)

here's the code based on your answer

import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
import logging 
import json 
import base64

app = func.FunctionApp()

@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
    logging.info('Python EventGrid trigger processed an event')

    event_payload = azeventgrid.get_json()
    payload_base64 = event_payload.get('body', '')

    if payload_base64:
        payload_bytes = base64.b64decode(payload_base64)
        logging.info(payload_bytes)
    else:
        logging.warning('No payload found in the event.')

    
    container_name = "test1ayoub"
    directory_path = "ayoub1/ayoub2"
    outfile_name = "test.txt"
    data = {
        "name":"ayoub",
        "id":4
        }
    connection_string = "DefaultEndpointsProtocol=https;AccountName=polusounddatalake;AccountKey=F7gn1SCvnDV9PQ4/vR9wGvGk/pzZPcKf9Av9h7apZwGeW17EPqb3iLwMQIUWTkkaoiSkdIUlZq+AStw3BYjA==;EndpointSuffix=core.windows.net"
    service_client = DataLakeServiceClient.from_connection_string(connection_string)
    file_system_client = service_client.get_file_system_client(container_name)
    destination_path = directory_path+"/"+outfile_name
    file_client = file_system_client.get_file_client(destination_path)
    json_file = json.dumps(data).encode()
    file_client.upload_data(json_file, overwrite=True)
    logging.info("saved to datalake-storage")

enter image description hereenter image description here


Solution

  • Sent data to datalake storage

    You can use below code (I have used http trigger), you can integrate into your event grid function:

    import azure.functions as func
    from azure.storage.filedatalake import DataLakeServiceClient
    import logging
    import json
    
    app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
    
    @app.route(route="http_trigger")
    def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
        logging.info('Python HTTP trigger function processed a request.')
        rith_con_name = "rithwik"
        rith_dir_path = "rithfolder1/rithfolder2"
        rith_outfile_name = "test.txt"
        rith_data = {
            "name":"rithwik",
            "id":8
            }
        rith_cs = "DefaultEndpointsProtocol=https;AccountName=rith32;AccountKey=9Ek4f80ttpwPjCg==;EndpointSuffix=core.windows.net"
        rith_sc = DataLakeServiceClient.from_connection_string(rith_cs)
        rith_fsc = rith_sc.get_file_system_client(rith_con_name)
        rith_des_path = rith_dir_path + "/" + rith_outfile_name
        json_rith = json.dumps(rith_data).encode()
        rith_fc = rith_fsc.get_file_client(rith_des_path)
        rith_fc.upload_data(json_rith, overwrite=True)
        return func.HttpResponse(f"Hello, Rithwik Bojja. This HTTP triggered function executed successfully.")
    

    rith_con_name is conatiner name

    rith_cs is the connection string

    requirements.txt:

    azure-functions
    azure-storage-file-datalake
    

    Output:

    enter image description here