azuregoazure-functionsazure-storage-queues

Duplicate Data Issue when Storing JSON Messages in Azure Storage Queue using Azure Functions in Go


Now I am trying to develop an application using Azure Functions and Go, utilizing Custom Handler. The architecture of my application is like this.

image

The problem seems to lie in storing a JSON message in an Azure Storage Queue. The message stored in queue has a duplicate data in its “data” and “metadata” .

My function 1, which is triggered by a HTTP Response from Slack and enqueues a message to the queue, is like this.

func function1(c *gin.Context) {
    log.Printf(“Start enqueueMessage”)

    // I use a JSON string here instead of an actual HTTP Request for an easy understanding
    stri := "{\"aaa\" : \"aaa\"}"
    base64BodyString := base64.StdEncoding.EncodeToString(stri)

    _ulr, err := url.Parse(fmt.Sprintf(“https://%s.queue.core.windows.net/%s”, accountName, queueName))
    if err != nil {
        log.Fatal(“Error parsing url: “, err)
    }

    credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
    if err != nil {
        log.Fatal(“Error creating shared key credential: “, err)
    }

    queueUrl := azqueue.NewQueueURL(*_ulr, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
    ctx := context.TODO()

    messageUrl := queueUrl.NewMessagesURL()
    _, err = messageUrl.Enqueue(ctx, base64BodyString, 0, 0)
    if err != nil {
        log.Fatal(“Error enqueueing message: “, err)
    }
    log.Printf(“Message enqueued successfully”)
}

Then, my function 2, which is fired by a Queue trigger and processes the message dequeued from the queue, is like this.

func function2(c *gin.Context) {

    log.Printf(“Start function”)

    type QueueMessage struct {
      Data struct {
         MyQueueItem string `json:"myQueueItem"`
      } `json:"data"`
      Metadata map[string]interface{}
    }

    var queueMessage QueueMessage
    err := c.BindJSON(&queueMessage)
    if err != nil {
        log.Printf(“Failed to bind request body: %v”, err)
        return
    }

        log.Printf(“Queue message : %v”, queueMessage)
}

The outputs data is like this.

{
  "message": "{\"token\": \"aaa\"}",
  "metadata": {
    "DequeueCount": 1,
    "ExpirationTime": "2023-08-01T13:25:29+00:00",
    "Id": "e2b5f133-cc27-46d5-92a8-0305251e65e7",
    "InsertionTime": "2023-07-25T13:25:29+00:00",
    "NextVisibleTime": "2023-07-25T13:35:31+00:00",
    "PopReceipt": "AgAAAAMAAAAAAAAAifLM4fy+2QE=",
    "sys": {
      "MethodName": "function2",
      "RandGuid": "9cdd85fc-9522-437a-8b2c-cc4ac4919ad4",
      "UtcNow": "2023-07-25T13:25:31.364779Z"
    },
    "token": "aaa"
  }
}

When I stored a message like “hoge” instead of a JSON message, it didn’t contain duplicate data.

Does anyone have an idea about how to solve this?

I would like to store a message without duplicate data in Azure Storage Queue.


Solution

  • Azure Storage Queue message is caused by the way you're encoding the JSON payload before enqueuing it and base64 encoding the JSON string and then passing it as the message body the message is dequeued and processed, we are binding it to the QueueMessage struct, which expects a specific structure with data and metadata fields.

    func function1(c *gin.Context) {
        log.Printf("Start enqueueMessage")
    
        payload := map[string]string{
            "myQueueItem": "aaa",
        }
    
        payloadBytes, err := json.Marshal(payload)
        if err != nil {
            log.Fatal("Error marshaling JSON: ", err)
        }
    
        base64BodyString := base64.StdEncoding.EncodeToString(payloadBytes)
    
        _url, err := url.Parse(fmt.Sprintf("https://%s.queue.core.windows.net/%s", accountName, queueName))
        if err != nil {
            log.Fatal("Error parsing url: ", err)
        }
    
        credential, err := azqueue.NewSharedKeyCredential(accountName, accountKey)
        if err != nil {
            log.Fatal("Error creating shared key credential: ", err)
        }
    
        queueUrl := azqueue.NewQueueURL(*_url, azqueue.NewPipeline(credential, azqueue.PipelineOptions{}))
        ctx := context.TODO()
    
        messageUrl := queueUrl.NewMessagesURL()
        _, err = messageUrl.Enqueue(ctx, base64BodyString, 0, 0)
        if err != nil {
            log.Fatal("Error enqueueing message: ", err)
        }
    
        log.Printf("Message enqueued successfully")
    }
    
    
    
    func function2(c *gin.Context) {
        log.Printf("Start function")
    
        type QueueMessage struct {
            MyQueueItem string `json:"myQueueItem"`
        }
    
        var queueMessage QueueMessage
        err := c.BindJSON(&queueMessage)
        if err != nil {
            log.Printf("Failed to bind request body: %v", err)
            return
        }
    
        log.Printf("Queue message : %v", queueMessage)
    }