httpgogitlabwebhooks

Force Gitlab to retry webhooks on failure with Go


I want to watch for every events in a Gitlab project and store them in an external service. For this, I use Gitlab Webhooks. I made a little local HTTP server in Go that listens for Gitlab's POSTs and forward them to an external service. Hooks contains every information I needed so it seems that this architecture is fine:

Gitlab > HTTPServer > External Service.

My problem is when the external service is down, I cannot manage to make Gitlab retry the failed requests. As the documentation says:

It is very surprising that Gitlab does not have a proper way to ask for a webhook retry. I have to explicitly return an invalid http response. Moreover, I cannot find an API endpoint to list all failed webhooks and ask for resend.

Question: How to explicitly return an invalid HTTP response with the standard "net/http" library in order to force Gitlab to retry Webhooks?


Solution

  • As written in the comments, a webhook is a mere notification that an event occurred, and potentially some data is sent, typically as JSON data.

    It is your responsibility to persist the event itself and the data you want/need to process that was sent with it. Below you will find a commented example. Note that this does not include incremental backoffs, but that should be easy to add:

    package main
    
    import (
        "encoding/json"
        "flag"
        "io"
        "log"
        "net/http"
        "os"
        "path/filepath"
    
        "github.com/joncrlsn/dque"
    )
    
    var (
        bind        string
        queueDir    string
        segmentSize int
    )
    
    // You might want to add request headers and stuff
    type webhookContent struct {
        Foo string
        Bar int
    }
    
    func init() {
        flag.StringVar(&bind, "bind", ":8080", "The address to bind to")
        flag.StringVar(&queueDir, "path", "./queue", "path to store the queue in")
        flag.IntVar(&segmentSize, "size", 50, "number of entries for the queue")
    }
    
    // The "webserver" component
    func runserver(q *dque.DQue) {
    
        http.HandleFunc("/webhook", func(w http.ResponseWriter, r *http.Request) {
            // A new decoder for each call, as we want to have a new LimitReader
            // for each call. This is a simple, albeit a bit crude method to prevent
            // accidental or malicious overload of your server.
            dec := json.NewDecoder(io.LimitReader(r.Body, 4096))
    
            defer r.Body.Close()
    
            c := &webhookContent{}
            if err := dec.Decode(c); err != nil {
                log.Printf("reading body: %s", err)
                http.Error(w, "internal error", http.StatusInternalServerError)
                return
            }
    
            // When the content is successfully decoded, we can persist it into
            // our queue.
            if err := q.Enqueue(c); err != nil {
                log.Printf("enqueueing webhook data: %s", err)
                // PROPER ERROR HANDLING IS MISSING HERE
            }
        })
    
        http.ListenAndServe(bind, nil)
    }
    
    func main() {
        flag.Parse()
    
        var (
            q   *dque.DQue
            err error
        )
    
        if !dirExists(queueDir) {
            if err = os.MkdirAll(queueDir, 0750); err != nil {
                log.Fatalf("creating queue dir: %s", err)
            }
        }
    
        if !dirExists(filepath.Join(queueDir, "webhooks")) {
            q, err = dque.New("webhooks", queueDir, segmentSize, func() interface{} { return &webhookContent{} })
        } else {
            q, err = dque.Open("webhooks", queueDir, segmentSize, func() interface{} { return &webhookContent{} })
        }
    
        if err != nil {
            log.Fatalf("setting up queue: %s", err)
        }
    
        defer q.Close()
    
        go runserver(q)
    
        var (
            // Placeholder during event loop
            i interface{}
            // Payload
            w *webhookContent
            // Did the type assertion succeed
            ok bool
        )
    
        for {
            // We peek only. The semantic of this is that
            // you can already access the next item in the queue
            // without removing it from the queue and "mark" it as read.
            // We use PeekBlock since we want to wait for an item in the
            // queue to be available.
            if i, err = q.PeekBlock(); err != nil {
                // If we can not peek, something is SERIOUSLY wrong.
                log.Fatalf("reading from queue: %s", err)
            }
    
            if w, ok = i.(*webhookContent); !ok {
                // If the type assertion fails, something is seriously wrong, too.
                log.Fatalf("reading from queue: %s", err)
            }
    
            if err = doSomethingUseful(w); err != nil {
                log.Printf("Something went wrong: %s", err)
                log.Println("I strongly suggest entering an incremental backoff!")
                continue
            }
    
            // We did something useful, so we can dequeue the item we just processed from the queue.
            q.Dequeue()
        }
    
    }
    
    func doSomethingUseful(w *webhookContent) error {
        log.Printf("Instead of this log message, you can do something useful with: %#v", w)
        return nil
    }
    
    func dirExists(path string) bool {
        fileInfo, err := os.Stat(path)
        if err == nil {
            return fileInfo.IsDir()
        }
        return false
    }
    

    Now when you do something like:

    $ curl -X POST --data '{"Foo":"Baz","Bar":42}' http://localhost:8080/webhook
    

    you should get a log entry like

    2020/04/18 11:34:23 Instead of this log message, you can do something useful with: &main.webhookContent{Foo:"Baz", Bar:42}