gopgx

Why pgx returns connection busy on commit?


I have a function to insert values to the table in bulk. tx.Commit() returns conn busy. As I got from reading code is that conn.Begin() actually makes it busy.

So the question is how to do this correctly? Should I use transactions together with batch queries at all? Or transaction is created under the hood?

// InsertItems adds items to the table
func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) error {

    conn, err := r.pool.Acquire(ctx)
    if err != nil {
        return fmt.Errorf("acquire connection: %w", err)
    }
    defer conn.Release()

    tx, err := conn.Begin(ctx)
    if err != nil {
        return fmt.Errorf("starting pgx transaction: %w", err)
    }
    defer func() { _ = tx.Rollback(ctx) }()

    batch := pgx.Batch{}

    for _, v := range values {

        query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)

        batch.Queue(query, v.ID, v.Date, v.Amount)
    }

    batchRes := tx.SendBatch(ctx, &batch)
    defer func() {
        if err := batchRes.Close(); err != nil {
            logger.Errorf("closing batch result: %v", err)
        }
    }()

    cmdTag, err := batchRes.Exec()
    if err != nil {
        return fmt.Errorf("batch res exec: %w", err)
    }

    logger.Debugf("inserted rows: %d", cmdTag.RowsAffected())

    if err := tx.Commit(ctx); err != nil {
        return fmt.Errorf("commiting pgx transaction: %w", err)
    }

    return nil
}

Solution

  • You are calling commit before calling close on the batch result. You need to first close the batch result before you can use the underlying connection again.

    To enforce the correct order of the deferred operations you could do something like the following:

    // InsertItems adds items to the table
    func (r *Repository) InsertItems(ctx context.Context, values []service.Transaction) (err error) {
        conn, err := r.pool.Acquire(ctx)
        if err != nil {
            return fmt.Errorf("acquire connection: %w", err)
        }
        defer conn.Release()
    
        batch := new(pgx.Batch)
        for _, v := range values {
            query := fmt.Sprintf(`INSERT INTO %v (id, date, amount) VALUES ($1, $2, $3)`, r.tableName)
            _ = batch.Queue(query, v.ID, v.Date, v.Amount)
        }
    
        tx, err := conn.Begin(ctx)
        if err != nil {
            return fmt.Errorf("starting pgx transaction: %w", err)
        }   
        result := tx.SendBatch(ctx, batch)
        defer func() {
            if e := result.Close(); e != nil {
                logger.Errorf("closing batch result: %v", e)
                err = e
            }
            
            if err != nil {
                _ = tx.Rollback(ctx)
            } else {
                if e := tx.Commit(ctx); e != nil {
                    err = e
                }
            }
        }()
        tag, err := result.Exec()
        if err != nil {
            return fmt.Errorf("batch res exec: %w", err)
        }
        
        logger.Debugf("inserted rows: %d", tag.RowsAffected())
        return nil
    }