postgresqlgopq

How to stream binary data into a PostgreSQL BYTEA column using the Golang lib/pq API?


I'd like to insert some binary data into a BYTEA column,

How would I go about streaming the contents of somefile.tar.gz into a table with a BYTEA column?

Is it possible to stream to/from postgres from/to golang?


Solution

  • If one would be willing to switch to github.com/jackc/pgx, Large Objects (PostgreSQL docs) can be streamed. The pgx.LargeObject type implements:

    io.Writer
    io.Reader
    io.Seeker
    io.Closer
    

    Large Objects are stored in a system table, there is no Large Object type which can be used in a column of a table. Large Objects are referenced by their object identifier. So a separate table needs to be maintained with file metadata and oid mapping.

    Example program:

    package main
    
    import (
        "context"
        "io"
        "log"
        "os"
        "time"
    
        "github.com/jackc/pgx/v4"
    )
    
    const (
        // files table maps Large Object oid to file names
        createFileTable = `CREATE TABLE files (
            id oid primary key,
            name varchar,
            unique(name)
        );`
    )
    
    func main() {
        ctx, cancel := context.WithTimeout(context.TODO(), time.Minute)
        defer cancel()
    
        conn, err := pgx.Connect(ctx, "user=postgres host=/run/postgresql dbname=postgres")
        if err != nil {
            panic(err)
        }
        defer conn.Close(ctx)
    
        if _, err = conn.Exec(ctx, createFileTable); err != nil {
            panic(err)
        }
    
        written, err := storeFile(ctx, conn, "somefile.bin")
        log.Printf("storeFile written: %d", written)
        if err != nil {
            panic(err)
        }
    
        read, err := loadFile(ctx, conn, "somefile.bin")
        log.Printf("loadFile read: %d", read)
        if err != nil {
            panic(err)
        }
    }
    
    // storeFile as Large Object in the database.
    // The resulting object identifier is stored along with the file name in the files table.
    // The amount of written bytes and an erorr is returned, if one occured.
    func storeFile(ctx context.Context, conn *pgx.Conn, name string) (written int64, err error) {
        file, err := os.Open(name)
        if err != nil {
            return 0, err
        }
        defer file.Close()
    
        // LargeObjects can only operate on an active TX
        tx, err := conn.Begin(ctx)
        if err != nil {
            return 0, err
        }
        defer tx.Rollback(ctx)
    
        lobs := tx.LargeObjects()
    
        // Create a new Large Object.
        // We pass 0, so the DB can pick an available oid for us.
        oid, err := lobs.Create(ctx, 0)
        if err != nil {
            return 0, err
        }
    
        // record the oid and filename in the files table
        _, err = tx.Exec(ctx, "INSERT INTO files (id, name) VALUES ($1, $2)", oid, name)
        if err != nil {
            return 0, err
        }
    
        // Open the new Object for writing.
        obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeWrite)
        if err != nil {
            return 0, err
        }
    
        // Copy the file stream to the Large Object stream
        written, err = io.Copy(obj, file)
        if err != nil {
            return written, err
        }
    
        err = tx.Commit(ctx)
        return written, err
    }
    
    // loadFile loads the file identified by name as Large Object
    // and writes the contents to a local file by the same name.
    // The amount of bytes read or an error is returned.
    func loadFile(ctx context.Context, conn *pgx.Conn, name string) (read int64, err error) {
        tx, err := conn.Begin(ctx)
        if err != nil {
            return 0, err
        }
        defer tx.Rollback(ctx)
    
        var oid uint32
        err = conn.QueryRow(ctx, "SELECT id FROM files WHERE name = $1", name).Scan(&oid)
        if err != nil {
            return 0, err
        }
    
        file, err := os.Create(name)
        if err != nil {
            return 0, err
        }
    
        lobs := tx.LargeObjects()
        obj, err := lobs.Open(ctx, oid, pgx.LargeObjectModeRead)
        if err != nil {
            return 0, err
        }
    
        return io.Copy(file, obj)
    }