I'm trying to save a json that i've sent from an ESP32 to an IoTHub, the json file looks like this : `
' { "msgCount": 0, "message": "hello world", "id": "test1", "SoundCategory": "rue", "SoundName": "pelleteuse", "dB": "105 dB", "Duration": "40 min", "StartTime": "04-06-2024:17h24", "EndTime": "04-06-2024:18h04", "Location": "[100.00,0.00]" }'
I've done a python azure function that is triggered each time the telemetry is received that until now worked
import logging
import azure.functions as func
import base64
app = func.FunctionApp()
@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
logging.info('Python EventGrid trigger processed an event')
event_payload = azeventgrid.get_json()
payload_base64 = event_payload.get('body', '')
if payload_base64:
payload_bytes = base64.b64decode(payload_base64)
logging.info(payload_bytes)
else:
logging.warning('No payload found in the event.')
the thing is, now I want to send it to my datalake storage but whenever I try to deploy on azure my function with some new import that I'm trying to connect with the datalake storage, the function disappear of the function App even tough the deployment was successful. I don't know if I'm doing it the right way and why whenever I try one of these it doesn't work.
Here's the two modules from which the issues seems to come from
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential
When I do just the importation of modules my code already disappears from the function app
import logging
import azure.functions as func
import base64
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential
app = func.FunctionApp()
@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
logging.info('Python EventGrid trigger processed an event')
event_payload = azeventgrid.get_json()
payload_base64 = event_payload.get('body', '')
if payload_base64:
payload_bytes = base64.b64decode(payload_base64)
logging.info(payload_bytes)
else:
logging.warning('No payload found in the event.')
and here are the other different codes that I've tried without success
import logging
import azure.functions as func
import json
import base64
import os
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
from azure.identity import DefaultAzureCredential
from datetime import datetime
app = func.FunctionApp()
storage_account_name = "mystorageaccountname"
directory_name = datetime.now().strftime('%Y%m%d')
@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
logging.info('Python EventGrid trigger processed an event')
event_payload = azeventgrid.get_json()
payload_base64 = event_payload.get('body', '')
if payload_base64:
payload_bytes = base64.b64decode(payload_base64)
# Initialize Azure Data Lake Storage client
storage_client = initialize_storage_account()
# # Create file system dynamically
file_system_client = create_file_system(storage_client)
# # Create directory within the file system
directory_client = create_directory(file_system_client, directory_name)
# # Upload payload bytes to the created directory
upload_file_to_directory(directory_client, local_path='', file_name='payload_file.bin', data=payload_bytes)
else:
logging.warning('No payload found in the event.')
def initialize_storage_account():
try:
credential = DefaultAzureCredential()
service_client = DataLakeServiceClient(account_url=f"https://{storage_account_name}.dfs.core.windows.net", credential=credential)
return service_client
except Exception as e:
logging.error(f"Error initializing storage account: {e}")
def create_file_system(service_client: DataLakeServiceClient) -> FileSystemClient:
try:
file_system_name = datetime.now().strftime('%Y%m%d%H%M%S') # Generate a unique file system name
file_system_client = service_client.create_file_system(file_system=file_system_name)
logging.info(f"File system '{file_system_name}' created successfully.")
return file_system_client
except Exception as e:
logging.error(f"Error creating file system: {e}")
def create_directory(self, file_system_client: FileSystemClient, directory_name: str) -> DataLakeDirectoryClient:
directory_client = file_system_client.create_directory(directory_name)
return directory_client
def upload_file_to_directory(self, directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
file_client = directory_client.get_file_client(file_name)
with open(file=os.path.join(local_path, file_name), mode="rb") as data:
file_client.upload_data(data, overwrite=True)
here's the code based on your answer
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
import logging
import json
import base64
app = func.FunctionApp()
@app.event_grid_trigger(arg_name="azeventgrid")
def getTelemetry(azeventgrid: func.EventGridEvent):
logging.info('Python EventGrid trigger processed an event')
event_payload = azeventgrid.get_json()
payload_base64 = event_payload.get('body', '')
if payload_base64:
payload_bytes = base64.b64decode(payload_base64)
logging.info(payload_bytes)
else:
logging.warning('No payload found in the event.')
container_name = "test1ayoub"
directory_path = "ayoub1/ayoub2"
outfile_name = "test.txt"
data = {
"name":"ayoub",
"id":4
}
connection_string = "DefaultEndpointsProtocol=https;AccountName=polusounddatalake;AccountKey=F7gn1SCvnDV9PQ4/vR9wGvGk/pzZPcKf9Av9h7apZwGeW17EPqb3iLwMQIUWTkkaoiSkdIUlZq+AStw3BYjA==;EndpointSuffix=core.windows.net"
service_client = DataLakeServiceClient.from_connection_string(connection_string)
file_system_client = service_client.get_file_system_client(container_name)
destination_path = directory_path+"/"+outfile_name
file_client = file_system_client.get_file_client(destination_path)
json_file = json.dumps(data).encode()
file_client.upload_data(json_file, overwrite=True)
logging.info("saved to datalake-storage")
Sent data to datalake storage
You can use below code (I have used http trigger), you can integrate into your event grid function:
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient
import logging
import json
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
rith_con_name = "rithwik"
rith_dir_path = "rithfolder1/rithfolder2"
rith_outfile_name = "test.txt"
rith_data = {
"name":"rithwik",
"id":8
}
rith_cs = "DefaultEndpointsProtocol=https;AccountName=rith32;AccountKey=9Ek4f80ttpwPjCg==;EndpointSuffix=core.windows.net"
rith_sc = DataLakeServiceClient.from_connection_string(rith_cs)
rith_fsc = rith_sc.get_file_system_client(rith_con_name)
rith_des_path = rith_dir_path + "/" + rith_outfile_name
json_rith = json.dumps(rith_data).encode()
rith_fc = rith_fsc.get_file_client(rith_des_path)
rith_fc.upload_data(json_rith, overwrite=True)
return func.HttpResponse(f"Hello, Rithwik Bojja. This HTTP triggered function executed successfully.")
rith_con_name is conatiner name
rith_cs is the connection string
requirements.txt:
azure-functions
azure-storage-file-datalake
Output: