I created a pollableChannel which is listening a S3 Bucket getting files and launching a job.
My classe is like this:
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory s3SessionFactory) {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory);
synchronizer.setPreserveTimestamp(true);
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setRemoteDirectory(awsS3Properties.getCercBucket());
return synchronizer;
}
@Bean
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer s3InboundFileSynchronizer) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(
s3InboundFileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new FileSystemResource(integrationProperties.getTempDirectoryName()).getFile());
return messageSource;
}
@Bean("${receivable.integration.inChannel}")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest()).handle(jobLaunchingGateway())
.get();
}
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(receivablePositionJob);
return fileMessageToJobRequest;
}
@Bean
@ServiceActivator(inputChannel = "${receivable.integration.inChannel}", poller = @Poller(fixedRate = "1000"))
public JobLaunchingGateway jobLaunchingGateway() {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository);
simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);
jobLaunchingGateway.setOutputChannel(s3FilesChannel());
return jobLaunchingGateway;
}
And my FileMessageToJobRequest is like this:
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName, message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
I want to add a custom MessageHeader in the Message or my second option is intercept the context before the message is published due to I need to set my tenant in ThreadLocal.
How could I do that?
Thanks in advance.
UPDATE with enrichHeaders:
@Bean
public IntegrationFlow integrationFlow(
S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource) {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource,
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1)))
.transform(fileMessageToJobRequest())
.enrichHeaders(Map.of("teste", "testandio"))
.handle(jobLaunchingGateway())
.get();
}
First of all you must remove that @ServiceActivator(inputChannel = "${receivable.integration.inChannel}"
since it points to the same s3FilesChannel
, which is an outputChannel
of that JobLaunchingGateway
, too. So, you are making a loop with such a configuration. Not sure how it works for you at all...
To add a header before sending to that JobLaunchingGateway
, you just need to add enrichHeaders()
before your .handle(jobLaunchingGateway())
in that integrationFlow
definition.