javaactivemq-classicvirtual-topic

Handling wildcards in CompositeQueues - ActiveMQ xml configuration file


I would like to ask for advice regarding a topic I have with ActiveMQ. šŸ˜…

I am using ActiveMQ 5.15. I am trying to modify the xml configuration file for adding a virtual destination using CompositeQueues that forward to another queue/topic. From the ActiveMQ documentation for this component, the schema is the following:

<compositeQueue name="IncomingOrders"> 
   <forwardTo>
    <topic physicalName="Notifications" />
   </forwardTo>
 </compositeQueue>

I have been able to forward messages from existing queues which are named like for example request.typeA.classC . However, I have several queues that use the same prefix request.typeA. and therefore my intention is to use wildcards for not having to define a composite queue for each existing queue with that prefix, and make it easier to maintain.

I would need something like this:

<compositeQueue name="request.typeA.>"> 
   <forwardTo>
    <topic physicalName="Notifications" />
   </forwardTo>
 </compositeQueue>

However that piece of code does not work, and I suspect it's because it's simply not supported (not yet at least). I have tried successfully to use wildcards in the physicalName property, but not in the name.

So I wonder if any of you know if it is possible to use wildcards into the name property (I have not read any evidence of that in the documentation), and if so, how could I do it. If you know for sure it is not possible to do that with the current ActiveMQ version, I would thank you that you could confirm it.

I would appreciate as well other alternatives/advice you could suggest for the same purpose I intend, and by meeting the preconditions I have mentioned before. I have also read about the Mirrored Queues, however that is a set up that affects all the existing queues (I am just interested for this in a small subset of them) and might have a considerable impact in the performance.

Thank you so much in advance for your time and best regards. šŸ˜„


Solution

  • Finally I found out a workaround that allowed me to create MirroredQueues for only a subset of queues, given a prefix.

    What I did was creating my own DestinationInterceptor in order to create only a mirrored queue for only those queues I was interested in, and exclude the rest (since the default MirroredQueue implementation mirrors all the queues created in the system).

    How I did it. I copied the MirroredQueue.java class implementation from the library to a new class called CustomMirroredQueue and added a new attribute to the class called mirroring. I modified the intercept(final Destination destination) implementation from the interface DestinationInterceptor, taking into consideration this new attribute in a if-statement (I created an auxiliar method for that called isPrefixMirrored):

    /*
    * This method is responsible for intercepting all the queues/topics that are created in the system.
    * In this particular case we are interested only in the queues, in order we can mirror *some* of them and get
    * a copy of the messages that are sent to them (With the topics this mirroring is not necessary since we would just
    * subscribe to that topic for receiving the same message).
    * */
    public Destination intercept(final Destination destination) {
        if (destination.getActiveMQDestination().isQueue()) {
            if (isPrefixMirrored(destination) && (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues())) {
                try {
                    //we create a mirrored queue for that destination
                    final Destination mirrorDestination = getMirrorDestination(destination);
                    if (mirrorDestination != null) {
                        return new DestinationFilter(destination) {
                            public void send(ProducerBrokerExchange context, Message message) throws Exception {
                                message.setDestination(mirrorDestination.getActiveMQDestination());
                                mirrorDestination.send(context, message);
    
                                if (isCopyMessage()) {
                                    message = message.copy();
                                }
                                message.setDestination(destination.getActiveMQDestination());
                                message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
                                super.send(context, message);
                            }
                        };
                    }
                } catch (Exception e) {
                    LOG.error("Failed to lookup the mirror destination for: {}", destination, e);
                }
            }
        }
        return destination;
    }
    
    /*
    * @returns true if the destination passed as parameter will be mirrored. If the value for the attribute "mirroring"
    * is an empty string "" then all the queues will be mirrored by default.
    **/
    private boolean isPrefixMirrored(Destination destination) {
        if (mirroring.equals("")) {
            return true;
        }
        List<String> mirroredQueuesPrefixes = Arrays.asList(mirroring.split(","));
        final String destinationPhysicalName = destination.getActiveMQDestination().getPhysicalName();
        return mirroredQueuesPrefixes.stream().map(String::trim).anyMatch(destinationPhysicalName::contains);
    }
    

    I generated a .jar with only this custom class and the dependencies (used gradle for that), and added to the lib folder in the ActimeMQ broker installation. Then I was able to use this tag as a bean in the ActiveMQ configuration XML file:

    <destinationInterceptors>
        <bean xmlns="http://www.springframework.org/schema/beans" class="package.CustomMirroredQueue" id="CustomMirroredQueue">
            <property name="copyMessage" value="true"/>
            <property name="postfix" value=""/>
            <property name="prefix" value="mirror."/>
            <property name="mirroring" value="PREFIX_1, QUEUE2, QUEUE3"/>
        </bean>
    </destinationInterceptors>
    

    The class must contain the path of that class from the library folder of ActiveMQ. The properties copyMessage, postfix and prefix are from the default MirroredQueue implementation. And the mirroring property would be a list with all the specific queues/prefixes to mirror (and only those).