I have a CEP flow that has a high throughput with 100+ messages per second.
And I am publishing the result of my processing into a JMS publisher with the following configuration:
Output Event Adapter Type* : JMS
JNDI Initial Context Factory Class: org.apache.activemq.jndi.ActiveMQInitialContextFactory
JNDI Provider URL *: tcp://localhost:61616
Connection Factory JNDI Name *: QueueConnectionFactory
Destination Type *: Queue
Destination *: myqueue
also, in order to try if the problem was not having concurrency i added:
Concurrent Publishers: Allow
to the JMSPublisher
and I am getting the following error:
ERROR {org.wso2.carbon.event.output.adapter.jms.JMSEventAdapter} -
Event dropped at Output Adapter 'kpis' for tenant id '-1234',
Job queue is full, Task java.util.concurrent.FutureTask@5651dd6c
rejected from java.util.concurrent.ThreadPoolExecutor@3c8c7b29
[Running, pool size =1, active threads = 1, queued tasks = 10000,
completed tasks = 176986]
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.FutureTask@5651dd6c rejected from
java.util.concurrent.ThreadPoolExecutor@3c8c7b29[Running, pool size = 1,
active threads = 1, queued tasks = 10000, completed tasks = 176986]
Is there any limitation on the throughput to a JMS activemq?
Also, so far there is no consumer on the queue I am writing all the messages to. Can that have a negative impact on the WSO2 CEP publisher causing that error and degrading performance?
From reading some info online it looks like this might be a direct problem with the pool size!
Is it possible to increase the JMSEventAdapter pool size? if yes, then how?
FULL STACK TRACE:
ERROR {org.wso2.carbon.event.output.ad apter.jms.JMSEventAdapter} - Event dropped at Output Adapter 'kpis' for tenant id '-1234', Job queue is full, Task java.util.concurrent.FutureTask@745cb718 rejected from java.util.concurrent.ThreadPoolExecutor@3a7d9bcf[Running, pool size = 100, active threads = 100, queued tasks = 10000, completed tasks = 5151]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@745cb718 rejected from java.util.concurrent.ThreadPoolExecutor@3a7d9bcf[Running, pool size = 100, active threads = 100, queued tasks = 10000, completed tasks = 5151]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at org.wso2.carbon.event.output.adapter.jms.JMSEventAdapter.publish(JMSEventAdapter.java:142)
at org.wso2.carbon.event.output.adapter.core.internal.OutputAdapterRuntime.publish(OutputAdapterRuntime.java:62)
at org.wso2.carbon.event.output.adapter.core.internal.CarbonOutputEventAdapterService.publish(CarbonOutputEventAdapterService.java:143)
at org.wso2.carbon.event.publisher.core.internal.EventPublisher.process(EventPublisher.java:414)
at org.wso2.carbon.event.publisher.core.internal.EventPublisher.sendEvent(EventPublisher.java:226)
at org.wso2.carbon.event.publisher.core.internal.EventPublisher.onEvent(EventPublisher.java:294)
at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvents(EventJunction.java:194)
at org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener.receive(SiddhiOutputStreamListener.java:100)
at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:98)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:69)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)
at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)
at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:56)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:150)
at org.wso2.siddhi.core.stream.StreamJunction.sendData(StreamJunction.java:214)
at org.wso2.siddhi.core.stream.StreamJunction.access$200(StreamJunction.java:46)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:343)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:49)
at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:59)
at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:51)
at org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher.sendEvent(SiddhiInputEventDispatcher.java:39)
at org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher.consumeEvent(AbstractSiddhiInputEventDispatcher.java:104)
at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvents(EventJunction.java:183)
at org.wso2.carbon.event.processor.core.internal.listener.SiddhiOutputStreamListener.receive(SiddhiOutputStreamListener.java:100)
at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:98)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:69)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)
at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:102)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)
at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)
at org.wso2.siddhi.core.query.input.stream.join.JoinProcessor.process(JoinProcessor.java:110)
at org.wso2.siddhi.core.query.processor.stream.window.LengthWindowProcessor.process(LengthWindowProcessor.java:86)
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:57)
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:101)
at org.wso2.siddhi.core.query.input.stream.join.JoinProcessor.process(JoinProcessor.java:118)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:102)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:126)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:323)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:46)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:78)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:40)
at org.wso2.siddhi.core.query.selector.QuerySelector.processNoGroupBy(QuerySelector.java:123)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:86)
at org.wso2.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:56)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:154)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:80)
at org.wso2.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:150)
at org.wso2.siddhi.core.stream.StreamJunction.sendData(StreamJunction.java:214)
at org.wso2.siddhi.core.stream.StreamJunction.access$200(StreamJunction.java:46)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:343)
at org.wso2.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:49)
at org.wso2.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:59)
at org.wso2.siddhi.core.stream.input.InputHandler.send(InputHandler.java:51)
at org.wso2.carbon.event.processor.core.internal.listener.SiddhiInputEventDispatcher.sendEvent(SiddhiInputEventDispatcher.java:39)
at org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher.consumeEvent(AbstractSiddhiInputEventDispatcher.java:104)
at org.wso2.carbon.event.stream.core.internal.EventJunction.sendEvent(EventJunction.java:146)
at org.wso2.carbon.event.receiver.core.internal.management.InputEventDispatcher.onEvent(InputEventDispatcher.java:27)
at org.wso2.carbon.event.receiver.core.internal.EventReceiver.sendEvent(EventReceiver.java:298)
at org.wso2.carbon.event.receiver.core.internal.EventReceiver.processMappedEvent(EventReceiver.java:222)
at org.wso2.carbon.event.receiver.core.internal.EventReceiver$MappedEventSubscription.onEvent(EventReceiver.java:355)
at org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime.onEvent(InputAdapterRuntime.java:110)
at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSMessageListener.onMessage(JMSMessageListener.java:61)
at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSTaskManager$MessageListenerTask.handleMessage(JMSTaskManager.java:643)
at org.wso2.carbon.event.input.adapter.jms.internal.util.JMSTaskManager$MessageListenerTask.run(JMSTaskManager.java:542)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Due to the high throughput of the execution plans to the publishers and the async mechanism that closes JMS connections, Active MQ JMS connection pool inside the WSO2 CEP engine is unable to keep up with the opening and closure of those connections.
This rapidly exhausts all the available connections. Independently of the maximum # set.
The solution in my case goes through reducing the number of messages sent per unit of time and accumulating results.