I have the following Azure functions
import logging
import azure.functions as func
from azure.storage.queue import QueueServiceClient
# Initialize queue client
queue_service = QueueServiceClient.from_connection_string("{YOUR_CONNECTION_STRING}")
queue_client = queue_service.get_queue_client("{YOUR_QUEUE_NAME}")
@app.route(route="{YOUR_FUNCTION_ROUTE}", auth_level=func.AuthLevel.ANONYMOUS)
def GetUrlsFunction(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
# Example URL to be added to the queue
example_url = "{YOUR_URL}"
# Send message to the queue
queue_client.send_message(example_url)
logging.info(f"Message added to the queue: {example_url}")
return func.HttpResponse(
"URLs have been added to the queue successfully.",
status_code=200
)
@app.queue_trigger(arg_name="azqueue", queue_name="{YOUR_QUEUE_NAME}", connection="AzureWebJobsStorage")
def ProcessMessageFunction(azqueue: func.QueueMessage):
try:
# Log the received message
logging.info("Received message: %s", azqueue.get_body())
# Decode the message body
message_body = azqueue.get_body().decode('utf-8')
logging.info('Decoded message: %s', message_body)
# Validate that the message is a URL
if not message_body.startswith("http"):
raise ValueError(f"Invalid URL format: {message_body}")
# Placeholder for URL processing logic
# This is where the URL would be processed
result = process_single_url("{BASE_URL}", message_body)
logging.info('Processing result: %s', result)
except requests.exceptions.RequestException as e:
logging.error("Network-related error: %s", str(e))
except ValueError as ve:
logging.error("Value error: %s", str(ve))
except Exception as e:
logging.error("Unhandled error: %s", str(e))
logging.exception("Exception details:")
When I run this using func start
I get lots of errors like Error : Message has reached MaxDequeueCount of 5. Moving message to queue 'pcaob-poison'
.
I'm not able to debug the cause of this error as I don't get enough information, even when running func start --verbose
My host.json
looks like
{
"version": "2.0",
"logging": {
"logLevel": {
"Function": "Debug",
"Host.Results": "Error",
"Host.Aggregator": "Trace",
"Queues": "Debug"
},
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
}
}
and my local.settings.json
looks like
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
"AzureWebJobsAccountName": "{YOUR_ACCOUNT_NAME}",
"AzureWebJobsStorage": "AccountName={YOUR_ACCOUNT_NAME};AccountKey={YOUR_ACCOUNT_KEY};DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/{YOUR_ACCOUNT_NAME};QueueEndpoint=http://127.0.0.1:10001/{YOUR_ACCOUNT_NAME};TableEndpoint=http://127.0.0.1:10002/{YOUR_ACCOUNT_NAME};",
"PCAOB_BASE_URL": "http://localhost:5000"
}
}
The following relevant azure python packages are used
azure-common==1.1.28
azure-core==1.30.2
azure-functions==1.20.0
azure-identity==1.16.0
azure-storage-blob==12.20.0
azure-storage-queue==12.11.0
And I'm using Python 3.11
I'm just trying to be able to debug the issue. So far, the output that says that the MaxDequeueCount limit has been reached is the only thing that I get and it's not useful.
Any advice is welcome
The issue is in Http trigger, you are not encoding it and sending due to which, it is been sent to poison queue. Below is the code(modified your) which worked for me:
import logging
import base64
import requests
import azure.functions as func
from azure.storage.queue import QueueServiceClient
ri_qs = QueueServiceClient.from_connection_string("DefaultEndpointsProtocol=https;AccountNamerithstore;AccountKey=XZlFEcKM3w==;EndpointSuffix=core.windows.net")
ri_qc = ri_qs.get_queue_client("queuename")
rith = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@rith.route(route="http_rith_trigger")
def http_rith_trigger(req: func.HttpRequest) -> func.HttpResponse:
ri_url = "https://rithwik.com/bojja"
tst_out = base64.b64encode(ri_url.encode('utf-8')).decode('utf-8')
ri_qc.send_message(tst_out)
logging.info(f"Hello message is sent to queue: {tst_out}")
return func.HttpResponse("Hello Rithwik Bojja, message is sent to queue",status_code=200)
@rith.queue_trigger(arg_name="rith1", queue_name="queuename", connection="AzureWebJobsStorage")
def test_rith_process(rith1: func.QueueMessage):
logging.info("Hello Rithwik Bojja, the message recieved to queue is : %s", rith1.get_body())
ri_out = rith1.get_body().decode('utf-8')
logging.info('Hello Rithwik Bojja, Message decoded as : %s', ri_out)
if not ri_out.startswith("http"):
raise ValueError(f"Hello Rithwik Bojja, Invalid URL: {ri_out}")
def process_single_url(base_url: str, url: str) -> str:
out = requests.get(f"{base_url}/process", params={"url": url})
return out.text
Output: