The question says it. I could not find any information on streaming byte arrays to a postgresql database in spring using r2dbc, e.g. for a file upload.
I can store the bytes, by reading all bytes of a file like this:
@PostMapping("/upload")
suspend fun upload(
@RequestPart("file") filePartMono: Mono<FilePart>
): User {
val filePart = filePartMono.awaitFirstOrNull() ?: throw UploadException("Missing file part")
var inputStream = filePart.content().awaitFirst().asInputStream()
val byteStream = ByteArrayOutputStream()
filePart.content()
.flatMap { dataBuffer -> Flux.just(dataBuffer.asByteBuffer().array()) }
.collectList()
.awaitFirst()
.forEach { bytes -> byteStream.write(bytes) }
val bytes = byteStream.toByteArray()
fileRepository.save(File(bytes));
}
But I would like to stream filePart.content()
to the database. I am also interested in then streaming a bytea
from postgres through a controller to the client.
In the readme of https://github.com/pgjdbc/r2dbc-postgresql, a Postgres bytea
(it is the BLOB type in Postgres) can be mapped to byte[]
, ByteBuffer
and R2dbc Blob
automatically.
I tried to map bytea
to the above 3 types in a Spring Boot 3/Spring Data R2dbc project.
The schema file:
CREATE TABLE IF NOT EXISTS posts (
-- id SERIAL PRIMARY KEY,
id UUID DEFAULT uuid_generate_v4(),
title VARCHAR(255),
content VARCHAR(255),
metadata JSON default '{}',
-- In this sample, use Varchar to store enum(name), Spring Data R2dbc can convert Java Enum to pg VARCHAR, and reverse.
status VARCHAR(255) default 'DRAFT',
created_at TIMESTAMP , --NOT NULL DEFAULT LOCALTIMESTAMP,
updated_at TIMESTAMP,
attachment bytea,
cover_image bytea,
cover_image_thumbnail bytea,
version INTEGER,
PRIMARY KEY (id)
);
And the mapped the Entity class.
@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "posts")
class Post {
@Id
@Column("id")
private UUID id;
@Column("title")
private String title;
@Column("content")
private String content;
@Column("metadata")
private Json metadata;
@Column("status")
private Status status;
@Column("attachment")
private ByteBuffer attachment;
@Column("cover_image")
private byte[] coverImage;
@Column("cover_image_thumbnail")
private Blob coverImageThumbnail;
@Column("created_at")
@CreatedDate
private LocalDateTime createdAt;
@Column("updated_at")
@LastModifiedDate
private LocalDateTime updatedAt;
@Column("version")
@Version
private Long version;
enum Status {
DRAFT, PENDING_MODERATION, PUBLISHED;
}
}
Unfortunately only the byte[]
type field worked well. See Spring Data relational issue #1408.
If you want to use other types(ByteBuffer an R2dbc Blob) at the moment, it requires extra custom converters.
@Configuration
public class DataR2dbcConfig {
// see: https://github.com/spring-projects/spring-data-relational/issues/1408
@Bean
public R2dbcCustomConversions r2dbcCustomConversions(ConnectionFactory connectionFactory) {
return R2dbcCustomConversions.of(
DialectResolver.getDialect(connectionFactory),
List.of(
new ByteArrayToByteBufferConverter(),
new ByteBufferToByteArrayConverter(),
new ByteArrayToBlobConverter(),
new BlobToByteArrayConverter()
)
);
}
}
@ReadingConverter
class ByteArrayToByteBufferConverter implements Converter<byte[], ByteBuffer> {
@Override
public ByteBuffer convert(byte[] source) {
return ByteBuffer.wrap(source);
}
}
@WritingConverter
class ByteBufferToByteArrayConverter implements Converter<ByteBuffer, byte[]> {
@Override
public byte[] convert(ByteBuffer source) {
return source.array();
}
}
@ReadingConverter
class ByteArrayToBlobConverter implements Converter<byte[], Blob> {
@Override
public Blob convert(byte[] source) {
return Blob.from(Mono.just(ByteBuffer.wrap(source)));
}
}
@WritingConverter
class BlobToByteArrayConverter implements Converter<Blob, byte[]> {
@Override
public byte[] convert(Blob source) {
return Mono.from(source.stream()).block().array();
}
}
Import the config in PostRepositoryTest, that all of these types are working now.
In the example project, there is a PostController, which includes a uploading/downloading example endpoint using ByteBuffer
.
@PutMapping("{id}/attachment")
public Mono<ResponseEntity<?>> upload(@PathVariable UUID id,
@RequestPart Mono<FilePart> fileParts) {
return Mono
.zip(objects -> {
var post = (Post)objects[0];
var filePart = (DataBuffer)objects[1];
post.setAttachment(filePart.toByteBuffer());
return post;
},
this.posts.findById(id),
fileParts.flatMap(filePart -> DataBufferUtils.join(filePart.content()))
)
.flatMap(this.posts::save)
.map(saved -> ResponseEntity.noContent().build());
}
@GetMapping("{id}/attachment")
public Mono<Void> read(@PathVariable UUID id, ServerWebExchange exchange) {
return this.posts.findById(id)
.log()
.map(post -> Mono.just(new DefaultDataBufferFactory().wrap(post.getAttachment())))
.flatMap(r -> exchange.getResponse().writeWith(r));
}
Update, added integration tests for the file uploading and downloading.
Update, spring-data-relational issue #1408 is resolved, we will not need the above converters when you update to use the latest versions.