I am currently trying to use Keda to start a job after having received an event from Azure Event Hub. When I deploy my scaledjob, I see the message (with kubectl describe):
Warning KEDAScalerFailed 4m29s (x17 over 9m57s) scale-handler unable to get eventhub metadata: no storage connection string given
Here is the scaledjob I'm using, it is from the sample we can find in keda's website
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: pi-az-consumer
namespace: default
spec:
jobTargetRef:
template:
spec:
containers:
- name: pi
image: perl:5.34.0
command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
restartPolicy: Never
backoffLimit: 4
pollingInterval: 10 # Optional. Default: 30 seconds
maxReplicaCount: 30 # Optional. Default: 100
successfulJobsHistoryLimit: 3 # Optional. Default: 100. How many completed jobs should be kept.
failedJobsHistoryLimit: 2 # Optional. Default: 100. How many failed jobs should be kept.
scalingStrategy:
strategy: "custom" # Optional. Default: default. Which Scaling Strategy to use.
customScalingQueueLengthDeduction: 1 # Optional. A parameter to optimize custom ScalingStrategy.
customScalingRunningJobPercentage: "0.5" # Optional. A parameter to optimize custom ScalingStrategy.
triggers:
- type: azure-eventhub
metadata:
connectionFromEnv: "Endpoint=sb://{...}.servicebus.windows.net/;SharedAccessKeyName={...};SharedAccessKey={...};EntityPath={...}"
storageConnectionFromEnv: "DefaultEndpointsProtocol=https;AccountName={...};AccountKey={...};EndpointSuffix=core.windows.net"
consumerGroup: $Default
unprocessedEventThreshold: '64'
activationUnprocessedEventThreshold: '10'
blobContainer: '{...}'
From what I've found on the internet, the connections are what I should have used. Is the problem with my connections? is it elsewhere? I was expecting a connection or an authentication error but I do not understand why I am told that I did not give the connection string.
Thank you if you have any idea where the problem can come from, I will be keeping this question updated if I find the answer.
The error no storage connection string given
is likely due to how the connection strings are being referenced in your ScaledJob
configuration.
You can follow the below steps to get your event data. Things you will need-
An Event Hub namespace
az eventhubs namespace create --name $EVENTHUB_NAMESPACE --resource-group $RESOURCE_GROUP --location eastus
An Event Hub
az eventhubs eventhub create --name $EVENTHUB_NAME --namespace-name $EVENTHUB_NAMESPACE --resource-group $RESOURCE_GROUP
A Storage Account
az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location eastus --sku Standard_LRS
A Blob Container
az storage container create --name $CONTAINER_NAME --account-name $STORAGE_ACCOUNT_NAME
Once created, retrieve the connection strings for the Event Hub and the Storage Account.
EVENTHUB_CONNECTION_STRING=$(az eventhubs namespace authorization-rule keys list --resource-group $RESOURCE_GROUP --namespace-name $EVENTHUB_NAMESPACE --name RootManageSharedAccessKey --query primaryConnectionString --output tsv)
STORAGE_CONNECTION_STRING=$(az storage account show-connection-string --name $STORAGE_ACCOUNT_NAME --query connectionString --output tsv)
echo $EVENTHUB_CONNECTION_STRING
echo $STORAGE_CONNECTION_STRING
Store the connection strings in a Kubernetes secret.
kubectl create secret generic eventhub-secrets \
--from-literal=eventhub-connection-string=$EVENTHUB_CONNECTION_STRING \
--from-literal=storage-connection-string=$STORAGE_CONNECTION_STRING \
--namespace default
Install Keda
Create the ScaledJob with KEDA
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: pi-az-consumer
namespace: default
spec:
jobTargetRef:
template:
spec:
containers:
- name: pi
image: perl:5.34.0
command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"]
env:
- name: EVENTHUB_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: eventhub-secrets
key: eventhub-connection-string
- name: STORAGE_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: eventhub-secrets
key: storage-connection-string
restartPolicy: Never
backoffLimit: 4
pollingInterval: 10 # Optional. Default: 30 seconds
maxReplicaCount: 30 # Optional. Default: 100
successfulJobsHistoryLimit: 3 # Optional. Default: 100. How many completed jobs should be kept.
failedJobsHistoryLimit: 2 # Optional. Default: 100. How many failed jobs should be kept.
scalingStrategy:
strategy: "custom" # Optional. Default: default. Which Scaling Strategy to use.
customScalingQueueLengthDeduction: 1 # Optional. A parameter to optimize custom ScalingStrategy.
customScalingRunningJobPercentage: "0.5" # Optional. A parameter to optimize custom ScalingStrategy.
triggers:
- type: azure-eventhub
metadata:
eventHubName: "arkoeventhub" # Add this line
connectionFromEnv: "EVENTHUB_CONNECTION_STRING"
storageConnectionFromEnv: "STORAGE_CONNECTION_STRING"
consumerGroup: "$Default"
unprocessedEventThreshold: '64'
activationUnprocessedEventThreshold: '10'
blobContainer: 'arkocontainer'
apply the same kubectl apply -f scaledjob.yaml
and now you can send Test Events to Event Hub.
I am using a python script as an example to trigger event-
from azure.eventhub import EventHubProducerClient, EventData
# Replace with your Event Hub connection string and Event Hub name
connection_str = 'Endpoint=sb://arkoeventhubns.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=RQd12345iDO7JOEpfL3khbNGlpZ0QhnP+abcd=;EntityPath=arkoeventhub'
eventhub_name = 'arkoeventhub'
producer = EventHubProducerClient.from_connection_string(conn_str=connection_str, eventhub_name=eventhub_name)
event_data_batch = producer.create_batch()
for i in range(10):
event_data_batch.add(EventData(f"Test message {i+1}"))
producer.send_batch(event_data_batch)
print("Messages sent successfully.")
Reference- Keda eventhub