BRIEF EXPLANATION:
I am developing a micro-service (MS-1) that should at some point reach information from another micro-service (MS-2) through a REST operation.
The MS-1 have a public end-point and also listen to kafka messages from a third micro-service (MS-3). Further, the endpoint will be deleted and MS-1 will listen to Kafka messages from MS-2 only.
PROBLEM:
When MS-1 is called through the endpoint, everything works fine and it does the needed REST operation correctly - MS-1 CAN CALL AND OBTAIN THE CORRECT RETURN FROM MS-2.
But, when the same request reach the MS-1 through Kafka messages, I mean, when MS-1 listen to a message sent from MS-3, it raises the following exception when MS-1 needs to call MS-2 through REST request - MS-1 CANNOT CALL MS-2.
EXCEPTION:
java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
at org.springframework.web.context.request.RequestContextHolder.currentRequestAttributes(RequestContextHolder.java:131) ~[spring-web-6.0.10.jar:6.0.10] at org.springframework.web.context.support.WebApplicationContextUtils.currentRequestAttributes(WebApplicationContextUtils.java:313) ~[spring-web-6.0.10.jar:6.0.10]
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:370) ~[spring-web-6.0.10.jar:6.0.10]
at org.springframework.web.context.support.WebApplicationContextUtils$SessionObjectFactory.getObject(WebApplicationContextUtils.java:365) ~[spring-web-6.0.10.jar:6.0.10]
at org.springframework.beans.factory.support.AutowireUtils$ObjectFactoryDelegatingInvocationHandler.invoke(AutowireUtils.java:283) ~[spring-beans-6.0.10.jar:6.0.10]
at jdk.proxy2/jdk.proxy2.$Proxy190.getAttribute(Unknown Source) ~[na:na]
at [PROTECTED PATH].getHeaders(RestProxyService.java:65) ~[classes/:na]
at [PROTECTED PATH].restExchange(RestProxyService.java:431) ~[classes/:na]
at [PROTECTED PATH].callRemoteApi(RestProxyService.java:346) ~[classes/:na]
at [PROTECTED PATH].callRestApi(RestProxyService.java:426) ~[classes/:na]
at [PROTECTED PATH].getDetail(MyProxy.java:57) ~[classes/:na]
at [PROTECTED PATH].entriesToReport(EntityWithDataService.java:75) ~[classes/:na] at [PROTECTED PATH].getReportObjects(SupportedReportCodes.java:69) ~[classes/:na]
at [PROTECTED PATH].generate(ReportService.java:72) ~[classes/:na]
at [PROTECTED PATH].receive(KafkaConsumer.java:25) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-6.0.10.jar:6.0.10]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-6.0.10.jar:6.0.10]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:375) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2924) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2904) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$58(KafkaMessageListenerContainer.java:2822) ~[spring-kafka-3.0.8.jar:3.0.8]
at io.micrometer.observation.Observation.lambda$observe$4(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603) ~[micrometer-observation-1.11.1.jar:1.11.1]
at io.micrometer.observation.Observation.observe(Observation.java:544) ~[micrometer-observation-1.11.1.jar:1.11.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2820) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2672) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2558) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2200) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1555) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1519) ~[spring-kafka-3.0.8.jar:3.0.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1394) ~[spring-kafka-3.0.8.jar:3.0.8]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
I have changed some configurations to solve it. For now, my application.properties and class are like that:
CONSUMER (listener) -> MS-1
application.properties
kafka.security.enabled=false
spring.kafka.bootstrap-servers=[PROTECTED]
#spring.kafka.producer.topic=report
spring.kafka.consumer.group-id=reports
spring.kafka.consumer.topic=report
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.properties.spring.json.type.mapping=log:[PROTECTED PATH].ReportMessageObject
class KafkaConsumer
@Component
@Slf4j
public class KafkaConsumer {
@Autowired
private MyService myService;
@KafkaListener(topics = "${spring.kafka.consumer.topic}", errorHandler = "kafkaEventErrorHandler", properties = "{spring.json.value.default.type=[PROTECTED PATH].ReportMessageObject}")
public void receive(@Payload @Valid ConsumerRecord<String, ReportMessageObject> payload) {
try {
log.debug("received payload={}", payload.toString());
myService.generate(payload.value().getCompanyId(), payload.value().getBody());
} catch (Exception e) {
log.error("Exception occurred while consuming ExternalInterface message {}", e.getMessage(), e);
}
}
}
class KafkaConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaAddress;
@Value(value = "${spring.kafka.consumer.group-id: null}")
private String kafkaGroup;
@Value("${kafka.configuration.security.enabled:false}")
private boolean kafkaSecurityEnabled;
@Value("${spring.kafka.properties.security.protocol:null}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism:null}")
private String kafkaSecurityMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config:null}")
private String kafkaSecurityJaasConfig;
@Bean
public ConsumerFactory<String, ReportMessageObject> responseConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(getKafkaConsumerConfigProperties(kafkaGroup), new StringDeserializer(), new JsonDeserializer<>(ReportMessageObject.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> responseKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ReportMessageObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(responseConsumerFactory());
return factory;
}
private Map<String, Object> getKafkaConsumerConfigProperties(String groupId) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
if(groupId != null && !groupId.isEmpty()) {
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
if (kafkaSecurityEnabled) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
}
return configProps;
}
}
PRODUCER -> MS-3
application.properties
kafka.security.enabled=false
spring.kafka.bootstrap-servers=[PROTECTED PATH]
spring.kafka.producer.topic=report
class KafkaProducer
@Slf4j
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, ReportMessageObject> kafkaTemplate;
@Autowired
private SmartKafkaHeader smartKafkaHeader;
private String property1;
private String property2;
private String property3;
private String property4;
public void addHeaders(String property1, String property2, String property3, String property4) {
this.property1= property1;
this.property2= property2;
this.property3= property3;
this.property4= property4;
}
public void send(String topic, ReportMessageObject reportMessageObject) {
ProducerRecord<String, ReportMessageObject> message = new ProducerRecord<>(topic, reportMessageObject);
smartKafkaHeader.addHeadersToRecord(message, property1, property2, property3, property4);
kafkaTemplate.send(message);
kafkaTemplate.flush();
log.info("sending payload='{}' to topic='{}'", reportMessageObject, topic);
}
}
class KafkaProducerConfig
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String kafkaAddress;
@Value(value = "${spring.kafka.producer.group-id: null}")
private String kafkaGroup;
@Value("${kafka.configuration.security.enabled:false}")
private boolean kafkaSecurityEnabled;
@Value("${spring.kafka.properties.security.protocol:null}")
private String kafkaSecurityProtocol;
@Value("${spring.kafka.properties.sasl.mechanism:null}")
private String kafkaSecurityMechanism;
@Value("${spring.kafka.properties.sasl.jaas.config:null}")
private String kafkaSecurityJaasConfig;
@Bean
public ProducerFactory<String, ReportMessageObject> producerFactorySmartMessaging() {
return new DefaultKafkaProducerFactory<>(getKafkaProducerConfigProperties(kafkaGroup));
}
@Bean
public KafkaTemplate<String, ReportMessageObject> kafkaMessaging() {
return new KafkaTemplate<>(producerFactorySmartMessaging());
}
private Map<String, Object> getKafkaProducerConfigProperties(String groupId){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddress);
if(groupId != null && !groupId.isEmpty()) {
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(JsonSerializer.TYPE_MAPPINGS, "log:[PROTECTED PATH].ReportMessageObject");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
if (kafkaSecurityEnabled) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaSecurityProtocol);
configProps.put(SaslConfigs.SASL_MECHANISM, kafkaSecurityMechanism);
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaSecurityJaasConfig);
}
return configProps;
}
}
The error is quite clear:
java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.
There is no web request context when the message is coming from Kafka, that is only a concept for a web application.
You can't use the same code in both places.
You would need to propagate any such information via record headers to make it available to a downstream consumer (and access it from there rather than trying to access the web context).