springamazon-s3spring-integrationspring-integration-aws

"For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true." in Spring Integration AWS


UPDATE: There is bug in spring-integration-aws-2.3.4

I am integrating SFTP (SftpStreamingMessageSource) as source with S3 as destination. I have similar Spring Integration configuration:

    @Bean
    public S3MessageHandler.UploadMetadataProvider uploadMetadataProvider() {
        return (metadata, message) -> {
            if ( message.getPayload() instanceof DigestInputStream) {
                metadata.setContentType( MediaType.APPLICATION_JSON_VALUE );
                // can not read stream to manually compute MD5
                // metadata.setContentMD5("BLABLA==");
                // this is wrong approach:  metadata.setContentMD5(BinaryUtils.toBase64((((DigestInputStream) message.getPayload()).getMessageDigest().digest()));
            }
        };
    }
    @Bean
    @InboundChannelAdapter(channel = "ftpStream")
    public MessageSource<InputStream> ftpSource(SftpRemoteFileTemplate template) {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template);
        messageSource.setRemoteDirectory("foo");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        messageSource.setLoggingEnabled(true);
        messageSource.setCountsEnabled(true);
        return messageSource;
    }
...
    @Bean
    @ServiceActivator(inputChannel = "ftpStream")
    public MessageHandler s3MessageHandler(AmazonS3 amazonS3, S3MessageHandler.UploadMetadataProvider uploadMetadataProvider) {
        S3MessageHandler messageHandler = new S3MessageHandler(amazonS3, "bucketName");
        messageHandler.setLoggingEnabled(true);
        messageHandler.setCountsEnabled(true);
        messageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        messageHandler.setUploadMetadataProvider(uploadMetadataProvider);
        messageHandler.setKeyExpression(new ValueExpression<>("key"));
        return messageHandler;
    }

After start, I am getting following error "For an upload InputStream with no MD5 digest metadata, the markSupported() method must evaluate to true."

This is because ftpSource is producing InputStream payload without mark/reset support. I even tried to transform InputStream to BufferedInputStream using @Transformer e.g. following

return new BufferedInputStream((InputStream) message.getPayload());

and no success, because then I am getting message "java.io.IOException: Stream closed" because S3MessageHandler:338 is calling Md5Utils.md5AsBase64(inputStream) which closes stream too early.

How to generate MD5 for all messages in Spring Integration AWS without pain?

I am using spring-integration-aws-2.3.4.RELEASE


Solution

  • The S3MessageHandler does this:

    if (payload instanceof InputStream) {
                    InputStream inputStream = (InputStream) payload;
                    if (metadata.getContentMD5() == null) {
                        Assert.state(inputStream.markSupported(),
                                "For an upload InputStream with no MD5 digest metadata, "
                                        + "the markSupported() method must evaluate to true.");
                        String contentMd5 = Md5Utils.md5AsBase64(inputStream);
                        metadata.setContentMD5(contentMd5);
                        inputStream.reset();
                    }
                    putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
                }
    

    Where that Md5Utils.md5AsBase64() closes an InputStream in the end - bad for us.

    This is an omission on our side. Please, raise a GH issue and we will fix it ASAP. Or feel free to provide a contribution.

    As a workaround I would suggest to have a transformer upfront of this S3MessageHandler with the code like:

    return org.springframework.util.StreamUtils.copyToByteArray(inputStream);
    

    This way you will have already a byte[] as a payload for the S3MessageHandler which will use a different branch for processing:

     else if (payload instanceof byte[]) {
                    byte[] payloadBytes = (byte[]) payload;
                    InputStream inputStream = new ByteArrayInputStream(payloadBytes);
                    if (metadata.getContentMD5() == null) {
                        String contentMd5 = Md5Utils.md5AsBase64(inputStream);
                        metadata.setContentMD5(contentMd5);
                        inputStream.reset();
                    }
                    if (metadata.getContentLength() == 0) {
                        metadata.setContentLength(payloadBytes.length);
                    }
                    putObjectRequest = new PutObjectRequest(bucketName, key, inputStream, metadata);
                }