postgresqlgoblobpgx

Inserting large object in Postgresql using jackc/pgx returns "out of memory (SQLSTATE 54000)"


I am using jackc/pgx library to insert largeobjects into Postgres. It works fine when the large objects are small. However in one case the large object was measuring almost 1.8 GB in size. As a result when performing the write operation, there was "out of memory (SQLSTATE 54000)" error.

Here is the code snippet how I am inserting blobs

import (
    "github.com/jackc/pgx/v4"
    "github.com/jackc/pgx/v4/pgxpool"
)

// Read bytes from the file to be imported as large object
b, err := ioutil.ReadFile(pathToLargeObjectFile)
txWrite, err := dbPool.Begin(ctx)
loWrite := txWrite.LargeObjects()

fmt.Printf("Creating new blob with ID : %d", ID)
id, err := loWrite.Create(ctx, ID)
// open blob with ID
obj, err := loWrite.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
n, err := obj.Write(b)
fmt.printf("Written %d byte to blob %d\n", n, id)

I get an error on this line

n, err := obj.Write(b)

How do I prevent the error and successfully import the large object?

I read this post Inserting Large Object into Postgresql returns 53200 Out of Memory error which tries to write the bytes in chunks.

Is similar possible with jackc/pgx?


Solution

  • The solution to the same is that we need to read the file in chunks and write it to the blob in chunks.

    The important point to note here is that when using obj.Write(b), the obj object maintains the pointer to the end of the previous write. Thus if we do the writes in succession the blob will get appended after every write

    Here is how I resolved it,

    // Open blob with ID
    obj, err := loWrite.Open(ctx, id, pgx.LargeObjectModeRead|pgx.LargeObjectModeWrite)
    if err != nil {
        fmt.Printf("Error opening blob. Error: %s",  err.Error())
        return err
    }
    
    importFile:= <string path to import file>
    // reader for the blob backup file
    reader := bufio.NewReader(blobBackupFile)
    chunk:=0
    id:= <id of the blob>
    // Start reading from blob in chunks and writing it to blob
    for {
        buf := make([]byte, bufferSize)    // Initializing the buffer
        bytesRead, err := reader.Read(buf) // Loading chunk into buffer
        buf = buf[:bytesRead]              // slicing it to the number of the bytes actually read
    
        
    
        if bytesRead == 0 {
            if err == io.EOF {
                fmt.Printf("Reached end of file %s", file.Name())
                break
            }
            if err != nil {
                fmt.Printf("Error reading chunks %d from file %s. Error: %s", chunk, importFile, err.Error())
                break
            }
    
            return err
        }
        loc, err := obj.Tell()
        if err != nil {
            fmt.Printf("Error in getting the current pointer location %s", err.Error())
        }
        fmt.Printf("BlobID: %d. Pointer at %#v ", id, loc)
        fmt.Printf("Writing chunk %d of %d bytes at address %#v of the blob %d", chunk, bytesRead, loc, id)
    
        bytesWritten, err := obj.Write(buf)
        if err != nil {
            fmt.Printf("Error writing bytes to blob %s", err.Error())
            return err
        }
        fmt.Printf("Written %d byte to blob %d.", bytesWritten, id)
        endLoc, err := obj.Tell()
        if err != nil {
            fmt.Printf("Error getting the pointer location after writing %s", err.Error())
        }
        fmt.Printf("Written chunk %d of %d bytes from address %#v to address %#v of the blob %d", chunk, bytesWritten, loc, endLoc, id)
    
        if int64(totalBytesRead) == file.Size() {
            fmt.Printf("Chunk %d was last chunk written at address %#v to address %#v of the blob %d", chunk, loc, endLoc, id)
            break
        }
    
        //next chunk
        chunk++
    }
    

    Close the blob and obj after writing the complete blob