I have the spring bean that uses dropwizard-metrics annotations on its methods to measure some metrics (see #parse method):
@Component
public class Consumer extends AbstractConsumer {
@Autowired
private EventsService eventsService;
private ExecutorService executor;
@PostConstruct
void init() throws Exception {
executor = Executors.newFixedThreadPool(1);
ConsumerEventsHandler consumer = new ConsumerEventsHandler(Arrays.asList("topic1"), this);
executor.submit(consumer);
}
@Timed(name = CONSUMER_BATCH_PARSING_TIME, absolute = true)
public void parse(ConsumerRecord<String, String> record) {
String val = record.value();
eventsService.parseToDB(val);
}
}
As you see this bean submits a new task to the executor service, the task is not a bean but it needs a reference to the Consumer bean to reuse its logic (see consumer.parse(record); call):
public class ConsumerEventsHandler implements Runnable {
private Consumer consumer;
private List<String> topics;
protected final KafkaConsumer<String, String> kafkaConsumer;
public ConsumerEventsHandler(List<String> topics, Consumer consumer) {
this.topics = topics;
this.consumer = consumer;
this.kafkaConsumer = new KafkaConsumer<>();
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
consumer.parse(record);
}
}
} finally {
consumer.close();
}
}
}
Here is metrics config (I use 'ryantenney/metrics-spring' lib):
@Configuration
@EnableMetrics(proxyTargetClass = true)
@ComponentScan(basePackages = "com.inq")
public class SsvpApiMetricsConfig extends MetricsConfigurerAdapter {
@Resource
private MetricRegistry registry;
@PostConstruct()
public void init() {
configureReporters(registry);
}
@Override
public void configureReporters(MetricRegistry metricRegistry) {
registerReporter(JmxReporter.forRegistry(metricRegistry).
inDomain("com.inq.metrics").build()).start();
}
}
As result, I see that Consumer bean is proxied but the ConsumerEventsHandler contains reference on the plain bean, cause when it was creating inside Consumer's @PostConstruct method through 'new ConsumerEventsHandler(..)' Consumer bean wasn't yet proxied, I assume that @PostConstruct is called before proxing things.
The only workaround I see is to get Consumer bean ref through ApplicationContext#getBean every time inside #run method instead of storing the ref inside ConsumerEventsHandler class variable. Are there any other solutions?
Just create a static on your non spring class
public class ConsumerEventsHandler implements Runnable {
private static Consumer consumer;
private List<String> topics;
protected final KafkaConsumer<String, String> kafkaConsumer;
public ConsumerEventsHandler(List<String> topics) {
this.topics = topics;
this.kafkaConsumer = new KafkaConsumer<>();
}
public static setConsumer(Consumer consumer) {
ConsumerEventsHandler.consumer = consumer;
}
Then on your spring side you can have :
@PostConstruct()
public void init() {
configureReporters(registry);
ConsumerEventsHandler.setConsumer(consumer);
}