I'm trying to consume a large message from an embedded ActiveMQ Artemis server with a Python client that is connecting using STOMP. The client creates a temp-queue, and then sends a message to a 'request' address. The server is supposed to generate a reply, and send it back to the temp-queue. In this case, the reply is about 2.8M JSON.
When the server tries to reply, I get this message:
info: 11:49:10 372 [Thread-1 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message AMQ601500: User anonymous@invm:0 sent a message NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]], context: RoutingContextImpl(Address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6, routingType=null, PreviousAddress=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 previousRoute:null, reusable=true, version=-2147483639)
..................................................
***** durable queues /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
***** non durable for /temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6:
- queueID=1139 address:/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6 name:2ca84365-155a-11f0-b485-56de892bcc7e filter:null
..................................................
, transaction: null
info: 11:49:10 372 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.audit.message AMQ601501: User anonymous@127.0.0.1:64850 is consuming a message from 2ca84365-155a-11f0-b485-56de892bcc7e: Reference[1145]:NON-RELIABLE:NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
warning: 11:49:10 384 [Thread-3 (ActiveMQ-server-ActiveMQServerImpl::name=localhost)] org.apache.activemq.artemis.core.protocol.stomp AMQ332071: Unable to send message to client: NullStorageLargeServerMessage[messageID=1145, durable=false, address=/temp-queue/a7f07847-0382-417b-a131-7f626f99a3c6,properties=TypedProperties[_AMQ_LARGE_SIZE=5681703]]
java.lang.IndexOutOfBoundsException null
at io.netty.buffer.EmptyByteBuf.checkIndex(EmptyByteBuf.java:1042)
at io.netty.buffer.EmptyByteBuf.writerIndex(EmptyByteBuf.java:150)
at io.netty.buffer.WrappedByteBuf.writerIndex(WrappedByteBuf.java:132)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.writerIndex(ChannelBufferWrapper.java:635)
at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.populateFrameBodyFromLargeMessage(VersionedStompFrameHandler.java:394)
at org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler.createMessageFrame(VersionedStompFrameHandler.java:341)
at org.apache.activemq.artemis.core.protocol.stomp.v12.StompFrameHandlerV12.createMessageFrame(StompFrameHandlerV12.java:57)
at org.apache.activemq.artemis.core.protocol.stomp.StompConnection.createStompMessage(StompConnection.java:649)
at org.apache.activemq.artemis.core.protocol.stomp.StompSession.sendMessage(StompSession.java:170)
at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.deliverStandardMessage(ServerConsumerImpl.java:1205)
at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.proceedDeliver(ServerConsumerImpl.java:522)
at org.apache.activemq.artemis.core.server.impl.QueueImpl.proceedDeliver(QueueImpl.java:3812)
at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:3058)
at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:4157)
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:59)
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:32)
at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:68)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:120)
My (probably incorrect) understanding is that ActiveMQ Artemis is supposed to break this up into smaller chunks, but it is just giving me the exception instead.
I must be missing something (possible a configuration). Can someone please tell me what it is?
I'm using Artemis 2.40.0. I'm serializing my object to JSON, and then sending as a TEXT message:
Here is an small program that exhibits this behaviour:
public class Example {
public static void main(String[] args) throws Exception {
EmbeddedActiveMQ broker = new EmbeddedActiveMQ();
broker.setConfigResourcePath("file:example.xml");
broker.start();
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
ClientSessionFactory clientSessionFactory = locator.createSessionFactory();
ClientSession session = clientSessionFactory.createSession();
session.start();
ClientConsumer consumer = session.createConsumer("json.requests");
consumer.setMessageHandler(new MessageHandler() {
@Override
public void onMessage(ClientMessage message) {
byte[] bytes = new byte[message.getBodySize()];
message.getBodyBuffer().readBytes(bytes);
JsonElement jsonElement = JsonParser.parseString(new String(bytes));
String replyTo = jsonElement.getAsJsonObject().get("reply-to").getAsString();
String s = generateLargeMessage();
try {
ClientProducer producer = session.createProducer("/temp-queue/" + replyTo);
ClientMessage m = session.createMessage(TEXT_TYPE, false);
m.getBodyBuffer().writeNullableSimpleString(SimpleString.of(s));
producer.send(m);
session.commit();
} catch (ActiveMQException e) {
System.err.println("Couldn't send message: " + e.getMessage());
}
}
});
}
private static String generateLargeMessage() {
StringBuilder sb = new StringBuilder();
while (sb.length() < 2000000) {
sb.append("abcdefghijklmnopqrstuvwxyz\n");
}
return sb.toString();
}
}
Here is my xml:
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:activemq" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<core xmlns="urn:activemq:core">
<persistence-enabled>false</persistence-enabled>
<security-enabled>false</security-enabled>
<acceptors>
<acceptor name="in-vm">vm://0</acceptor>
<acceptor name="in-tcp">tcp://localhost:61616</acceptor>
</acceptors>
<addresses>
<address name="json.requests">
<anycast>
<queue name="json.requests"/>
</anycast>
</address>
</addresses>
</core>
</configuration>
Here is my test python:
#!venv/bin/python3
import time
import stomp
import json
import uuid
myuuid = uuid.uuid4()
print(f"myuuid = {myuuid}")
conn = stomp.Connection12([('127.0.0.1', 61616)], heartbeats=(5000,5000))
conn.set_listener('', stomp.PrintingListener())
conn.connect(None, None, wait=True)
print("subscribing...")
conn.subscribe(destination=f"/temp-queue/{myuuid}", id=1, ack='auto')
msg = json.dumps({'type':'DaoRequest', 'request': 'users', 'reply-to': f"{myuuid}" })
print(f'sending {msg} to json.requests')
resp = conn.send(body=msg, destination="json.requests")
time.sleep(60)
The STOMP protocol doesn't support chunked message bodies like the Core protocol does so your understanding that the broker will break the message into chunks in this context is incorrect.
However, it is possible to consume "large" messages using the STOMP protocol. The broker will simply read the entire message body from disk into memory before sending it across the wire to the consumer. There are several tests in the test-suite that exercise this functionality and all these tests passed on 2.40.0. That said, you're doing something different than what's being done in the tests. You've disabled persistence in your broker.xml
, e.g.:
<persistence-enabled>false</persistence-enabled>
If you enable persistence then your code works, e.g.:
<persistence-enabled>true</persistence-enabled>
Another way to work around the issue is to increase the minLargeMessageSize
for the client sending the response so that the broker doesn't treat these messages as "large", e.g.:
ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");
locator.setMinLargeMessageSize(6291456);
I'm still investigating exactly why this doesn't work with persistence disabled.