spring-data-r2dbc

How to stream filepart to database (r2dbc postgres)?


The question says it. I could not find any information on streaming byte arrays to a postgresql database in spring using r2dbc, e.g. for a file upload.

I can store the bytes, by reading all bytes of a file like this:

@PostMapping("/upload")
suspend fun upload(
  @RequestPart("file") filePartMono: Mono<FilePart>
): User {
  val filePart = filePartMono.awaitFirstOrNull() ?: throw UploadException("Missing file part")

  var inputStream = filePart.content().awaitFirst().asInputStream()

  val byteStream = ByteArrayOutputStream()
  filePart.content()
    .flatMap { dataBuffer -> Flux.just(dataBuffer.asByteBuffer().array()) }
    .collectList()
    .awaitFirst()
    .forEach { bytes -> byteStream.write(bytes) }

  val bytes = byteStream.toByteArray()

  fileRepository.save(File(bytes));
}

But I would like to stream filePart.content() to the database. I am also interested in then streaming a bytea from postgres through a controller to the client.


Solution

  • In the readme of https://github.com/pgjdbc/r2dbc-postgresql, a Postgres bytea (it is the BLOB type in Postgres) can be mapped to byte[], ByteBuffer and R2dbc Blob automatically.

    I tried to map bytea to the above 3 types in a Spring Boot 3/Spring Data R2dbc project.

    The schema file:

    CREATE TABLE IF NOT EXISTS posts (
        -- id SERIAL PRIMARY KEY,
        id UUID DEFAULT uuid_generate_v4(),
        title VARCHAR(255),
        content VARCHAR(255),
        metadata JSON default '{}',
        -- In this sample, use Varchar to store enum(name), Spring Data R2dbc can convert Java Enum to pg VARCHAR, and reverse.
        status VARCHAR(255) default 'DRAFT',
        created_at TIMESTAMP , --NOT NULL DEFAULT LOCALTIMESTAMP,
        updated_at TIMESTAMP,
        attachment bytea,
        cover_image bytea,
        cover_image_thumbnail bytea,
        version INTEGER,
        PRIMARY KEY (id)
    );
    

    And the mapped the Entity class.

    @Data
    @ToString
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    @Table(value = "posts")
    class Post {
    
        @Id
        @Column("id")
        private UUID id;
    
        @Column("title")
        private String title;
    
        @Column("content")
        private String content;
    
        @Column("metadata")
        private Json metadata;
    
        @Column("status")
        private Status status;
    
        @Column("attachment")
        private ByteBuffer attachment;
    
        @Column("cover_image")
        private byte[] coverImage;
    
        @Column("cover_image_thumbnail")
        private Blob coverImageThumbnail;
    
        @Column("created_at")
        @CreatedDate
        private LocalDateTime createdAt;
    
        @Column("updated_at")
        @LastModifiedDate
        private LocalDateTime updatedAt;
    
        @Column("version")
        @Version
        private Long version;
    
        enum Status {
            DRAFT, PENDING_MODERATION, PUBLISHED;
        }
    
    }
    

    Unfortunately only the byte[] type field worked well. See Spring Data relational issue #1408.

    If you want to use other types(ByteBuffer an R2dbc Blob) at the moment, it requires extra custom converters.

    @Configuration
    public class DataR2dbcConfig {
    
        // see: https://github.com/spring-projects/spring-data-relational/issues/1408
        @Bean
        public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
            return R2dbcCustomConversions.of(
                    DialectResolver.getDialect(connectionFactory),
                    List.of(
                            new ByteArrayToByteBufferConverter(),
                            new ByteBufferToByteArrayConverter(),
                            new ByteArrayToBlobConverter(),
                            new BlobToByteArrayConverter()
                    )
            );
        }
    }
    
    
    @ReadingConverter
    class ByteArrayToByteBufferConverter implements Converter<byte[], ByteBuffer> {
    
        @Override
        public ByteBuffer convert(byte[] source) {
            return ByteBuffer.wrap(source);
        }
    }
    
    @WritingConverter
    class ByteBufferToByteArrayConverter implements Converter<ByteBuffer, byte[]> {
    
        @Override
        public byte[] convert(ByteBuffer source) {
            return source.array();
        }
    }
    
    @ReadingConverter
    class ByteArrayToBlobConverter implements Converter<byte[], Blob> {
    
        @Override
        public Blob convert(byte[] source) {
            return Blob.from(Mono.just(ByteBuffer.wrap(source)));
        }
    }
    
    @WritingConverter
    class BlobToByteArrayConverter implements Converter<Blob, byte[]> {
    
        @Override
        public byte[] convert(Blob source) {
            return Mono.from(source.stream()).block().array();
        }
    }
    
    

    Import the config in PostRepositoryTest, that all of these types are working now.

    In the example project, there is a PostController, which includes a uploading/downloading example endpoint using ByteBuffer.

        @PutMapping("{id}/attachment")
        public Mono<ResponseEntity<?>> upload(@PathVariable UUID id,
                                           @RequestPart Mono<FilePart> fileParts) {
    
            return Mono
                    .zip(objects -> {
                                var post = (Post)objects[0];
                                var filePart = (DataBuffer)objects[1];
                                post.setAttachment(filePart.toByteBuffer());
                                return post;
                            },
                            this.posts.findById(id),
                            fileParts.flatMap(filePart -> DataBufferUtils.join(filePart.content()))
                    )
                    .flatMap(this.posts::save)
                    .map(saved -> ResponseEntity.noContent().build());
    
        }
    
    
        @GetMapping("{id}/attachment")
        public Mono<Void> read(@PathVariable UUID id, ServerWebExchange exchange) {
            return this.posts.findById(id)
                    .log()
                    .map(post -> Mono.just(new DefaultDataBufferFactory().wrap(post.getAttachment())))
                    .flatMap(r -> exchange.getResponse().writeWith(r));
        }
    

    Update, added integration tests for the file uploading and downloading.

    Update, spring-data-relational issue #1408 is resolved, we will not need the above converters when you update to use the latest versions.