azurekubernetesazureservicebuskedakeda-scaledjob

Keda servicebus: All the messages getting processed by some pods of keda scaledJob


I am using Azure Kubernetes Service(AKS) platform and used KEDA "ScaledJob" for long running job. In this, Azure Service Bus Queue trigger has been used to auto trigger Jobs. Now while I am adding messages in Azure Service Bus, KEDA will auto trigger job and create node/pod as per configuration. but in this case, all the messages are getting picked up and processed by some pods. it is expected that each scaled up pod processes single message and terminates.

following is my yml file

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: {{ .Chart.Name }}
spec:
  jobTargetRef:
    backoffLimit: 4
    parallelism: 1
    completions: 1
    activeDeadlineSeconds: 300
    template:
      spec:
        imagePullSecrets:
          - name: {{ .Values.image.imagePullSecrets }}
        terminationGracePeriodSeconds: 30
        dnsPolicy: ClusterFirst
        volumes:
          - name: azure
            azureFile:
              shareName: sharenameone
              secretName: secret-sharenameone
              readOnly: true
          - name: one-storage
            emptyDir: {}
          - name: "logging-volume-file"
            persistentVolumeClaim:
              claimName: "azure-file-logging"        
        initContainers:
          - name: test-java-init
            image: {{ .Values.global.imageRegistryURI }}/{{ .Values.image.javaInitImage.name}}:{{ .Values.image.javaInitImage.tag }}
            imagePullPolicy: {{ .Values.image.pullPolicy }}
            securityContext:
              readOnlyRootFilesystem: true
            resources:
              requests:
                cpu: 100m
                memory: 300Mi
              limits:
                cpu: 200m
                memory: 400Mi
            volumeMounts:
              - name: azure
                mountPath: /mnt/azure
              - name: one-storage
                mountPath: /certs
        containers:
          - name: {{ .Chart.Name }}
            image: {{ .Values.global.imageRegistryURI }}/tests/{{ .Chart.Name }}:{{ .Values.version }}
            imagePullPolicy: {{ .Values.image.pullPolicy }}
            env:
              {{- include "chart.envVars" . | nindent 14 }}
              - name: JAVA_OPTS
                value: >-
                    {{ .Values.application.javaOpts }}          
              - name: application_name
                value: "test_application"
              - name: queueName
                value: "test-queue-name"
              - name: servicebusconnstrenv
                valueFrom: 
                  secretKeyRef:
                    name: secrets-service-bus
                    key: service_bus_conn_str
            volumeMounts:
              - name: cert-storage
                mountPath: /certs
              - name: "logging-volume-azure-file"
                mountPath: "/mnt/logging"
            resources:
              {{- toYaml .Values.resources | nindent 14 }}                   
  pollingInterval: 30
  maxReplicaCount: 5
  successfulJobsHistoryLimit: 5
  failedJobsHistoryLimit: 20
  triggers:
  - type: azure-servicebus
    metadata:
      queueName: "test-queue-name"
      connectionFromEnv: servicebusconnstrenv
      messageCount: "1"

and this is my azure function listener

 @FunctionName("TestServiceBusTrigger")
  public void TestServiceBusTriggerHandler(
      @ServiceBusQueueTrigger(
              name = "msg",
              queueName = "%TEST_QUEUE_NAME%",
              connection = "ServiceBusConnectionString")
          final String inputMessage,
      final ExecutionContext context) {

    final java.util.logging.Logger contextLogger = context.getLogger();
    System.setProperty("javax.net.ssl.trustStore", "/certs/cacerts");
   
    try {
      // all the processing goes here 
    } catch (Exception e) {
     //Exception handling
    }
  }

what configurations needs to be added, so that each scaled up pod processes single message and terminates?


Solution

  • This is not how Azure Functions were designed, or rather even how KEDA should be used in general. It would be more ideal if the already running container processes as many messages as it can once provisioned.

    That being said, if your scenario still requires this, you could write a simple script that directly uses the Azure Service Bus SDK to fetch just one message, process it, and terminate.