gopublish-subscribegoogle-cloud-pubsubexponential-backoff

How does the exponential backoff configured in Google Pub/Sub's RetryPolicy work?


The cloud.google.com/go/pubsub library recently released (in v1.5.0, cf. https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.5.0) support for a new RetryPolicy server-side feature. The documentation (https://godoc.org/cloud.google.com/go/pubsub#RetryPolicy) for this currently reads

enter image description here

I've read the Wikipedia article, and although it describes exponential backoff in discrete time, I don't see how the article relates to the MinimumBackoff and MaximumBackoff parameters specifically. For guidance on this, I referred to the documentation for github.com/cenkalti/backoff, https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#ExponentialBackOff. That library defines an ExponentialBackoff as

type ExponentialBackOff struct {
    InitialInterval     time.Duration
    RandomizationFactor float64
    Multiplier          float64
    MaxInterval         time.Duration
    // After MaxElapsedTime the ExponentialBackOff returns Stop.
    // It never stops if MaxElapsedTime == 0.
    MaxElapsedTime time.Duration
    Stop           time.Duration
    Clock          Clock
    // contains filtered or unexported fields
}

where each randomized interval is calculated as

randomized interval =
    RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])

where RetryInterval is the current retry interval which, as I understand it, starts at a value of InitialInterval and is capped by MaxInterval.

Do I understand correctly that the MinimumBackoff and MaximumBackoff correspond to the InitialInterval and MaxInterval in github.com/cenkalti/backoff? That is, the MinimumBackoff is the initial wait period, and the MaximumBackoff is the largest amount of time allowed between retries?

To test my theories, I wrote the following simplified program:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

var (
    projectID                      string
    minimumBackoff, maximumBackoff time.Duration
)

const (
    topicName             = "test-topic"
    subName               = "test-subscription"
    defaultMinimumBackoff = 10 * time.Second
    defaultMaximumBackoff = 10 * time.Minute
)

func main() {
    flag.StringVar(&projectID, "projectID", "my-project", "Google Project ID")
    flag.DurationVar(&minimumBackoff, "minimumBackoff", 5*time.Second, "minimum backoff")
    flag.DurationVar(&maximumBackoff, "maximumBackoff", 60*time.Second, "maximum backoff")
    flag.Parse()
    log.Printf("Running with minumum backoff %v and maximum backoff %v...", minimumBackoff, maximumBackoff)

    retryPolicy := &pubsub.RetryPolicy{MinimumBackoff: minimumBackoff, MaximumBackoff: maximumBackoff}

    client, err := pubsub.NewClient(context.Background(), projectID)
    if err != nil {
        log.Fatalf("NewClient: %v", err)
    }

    topic, err := client.CreateTopic(context.Background(), topicName)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    log.Printf("Created topic %q", topicName)
    defer func() {
        topic.Stop()
        if err := topic.Delete(context.Background()); err != nil {
            log.Fatalf("Delete topic: %v", err)
        }
        log.Printf("Deleted topic %s", topicName)
    }()

    sub, err := client.CreateSubscription(context.Background(), subName, pubsub.SubscriptionConfig{
        Topic:       topic,
        RetryPolicy: retryPolicy,
    })
    if err != nil {
        log.Fatalf("CreateSubscription: %v", err)
    }
    log.Printf("Created subscription %q", subName)
    defer func() {
        if err := sub.Delete(context.Background()); err != nil {
            log.Fatalf("Delete subscription: %v", err)
        }
        log.Printf("Deleted subscription %q", subName)
    }()

    go func() {
        sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
            log.Printf("Nacking message: %s", msg.Data)
            msg.Nack()
        })
    }()

    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Hello, world!")})
    log.Println("Published message")
    time.Sleep(60 * time.Second)
}

If I run it with the flag-default MinimumBackoff and MaximumBackoff of 5s and 60s, respectively, I get the following output:

> go run main.go
2020/07/29 18:49:32 Running with minumum backoff 5s and maximum backoff 1m0s...
2020/07/29 18:49:33 Created topic "test-topic"
2020/07/29 18:49:34 Created subscription "test-subscription"
2020/07/29 18:49:34 Published message
2020/07/29 18:49:36 Nacking message: Hello, world!
2020/07/29 18:49:45 Nacking message: Hello, world!
2020/07/29 18:49:56 Nacking message: Hello, world!
2020/07/29 18:50:06 Nacking message: Hello, world!
2020/07/29 18:50:17 Nacking message: Hello, world!
2020/07/29 18:50:30 Nacking message: Hello, world!
2020/07/29 18:50:35 Deleted subscription "test-subscription"
2020/07/29 18:50:35 Deleted topic test-topic

whereas if I run it with MinimumBackoff and MaximumBackoff of 1s and 2s, respectively, I get

> go run main.go --minimumBackoff=1s --maximumBackoff=2s
2020/07/29 18:50:42 Running with minumum backoff 1s and maximum backoff 2s...
2020/07/29 18:51:11 Created topic "test-topic"
2020/07/29 18:51:12 Created subscription "test-subscription"
2020/07/29 18:51:12 Published message
2020/07/29 18:51:15 Nacking message: Hello, world!
2020/07/29 18:51:18 Nacking message: Hello, world!
2020/07/29 18:51:21 Nacking message: Hello, world!
2020/07/29 18:51:25 Nacking message: Hello, world!
2020/07/29 18:51:28 Nacking message: Hello, world!
2020/07/29 18:51:31 Nacking message: Hello, world!
2020/07/29 18:51:35 Nacking message: Hello, world!
2020/07/29 18:51:38 Nacking message: Hello, world!
2020/07/29 18:51:40 Nacking message: Hello, world!
2020/07/29 18:51:44 Nacking message: Hello, world!
2020/07/29 18:51:47 Nacking message: Hello, world!
2020/07/29 18:51:50 Nacking message: Hello, world!
2020/07/29 18:51:52 Nacking message: Hello, world!
2020/07/29 18:51:54 Nacking message: Hello, world!
2020/07/29 18:51:57 Nacking message: Hello, world!
2020/07/29 18:52:00 Nacking message: Hello, world!
2020/07/29 18:52:03 Nacking message: Hello, world!
2020/07/29 18:52:06 Nacking message: Hello, world!
2020/07/29 18:52:09 Nacking message: Hello, world!
2020/07/29 18:52:12 Nacking message: Hello, world!
2020/07/29 18:52:13 Deleted subscription "test-subscription"
2020/07/29 18:52:13 Deleted topic test-topic

It seems like in the latter example, the time between nacks is pretty consistently ~3s, which presumably represents a "best effort" to do it in the MaximumBackoff of 2s? What is still not clear to me is whether there is any randomization, whether there is a multiplier (from the first example, it doesn't seem like the time between retries is getting twice as long every time), and whether there is an equivalent of the MaxElapsedTime beyond which there are no more retries?


Solution

  • Retry policy fields for minimum backoff and maximum backoff are similar to InitialInterval and MaxInterval in your example above. Cloud Pub/Sub uses a similar formula as you mentioned to compute the exponential delay. This includes randomization as well.

    Beyond MaxInterval, every subsequent retry would have an added delay of MaxInterval. If you want to stop the retries after a certain number of attempts, we recommend using Dead Letter Queues.