javachronicle-queuechronicle-wire

Can ChronicleQueue tailers for two different queues be interleaved?


I have two separate ChronicleQueues that were created by independent threads that monitor web socket streams in a Java application. When I read each queue independently in a separate single-thread program, I can traverse each entire queue as expected - using the following minimal code:

final ExcerptTailer queue1Tailer = queue1.createTailer();
final ExcerptTailer queue2Tailer = queue2.createTailer();

while (true)
{
   try( final DocumentContext context = queue1Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter1++;
      queue1Data = context.wire()
                           .bytes()
                           .readObject(Queue1Data.class);

      queue1Writer.write(String.format("%d\t%d\t%d%n", counter1, queue1Data.getEventTime(), queue1Data.getEventContent()));
   }
}

while (true)
{
   try( final DocumentContext context = queue2Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter2++;
      queue2Data = context.wire()
                           .bytes()
                           .readObject(Queue2Data.class);

      queue2Writer.write(String.format("%d\t%d\t%d%n", counter2, queue2Data.getEventTime(), queue2Data.getEventContent()));
   }
}

In the above, I am able to read all the Queue1Data objects, then all the Queue2Data objects and access values as expected. However, when I try to interleave reading the queues (read an object from one queue, based on a property of Queue1Data object (a time stamp), read Queue2Data objects until the first object that is after the time stamp (the limit variable below), of the active Queue1Data object is found - then do something with it) after only one object from the queue2Tailer is read, an exception is thrown .DecoratedBufferUnderflowException: readCheckOffset0 failed. The simplified code that fails is below (I have tried putting the outer while(true) loop inside and outside the the queue2Tailer try block):

final ExcerptTailer queue1Tailer = queue1Queue.createTailer("label1");

try( final DocumentContext queue1Context = queue1Tailer.readingDocument() )
{
   final ExcerptTailer queue2Tailer = queue2Queue.createTailer("label2");
    
   while (true)
   {
      try( final DocumentContext queue2Context = queue2Tailer.readingDocument() )
      {
         if ( isNull(queue2Context.wire()) )
         {
            terminate = true;
            break;
         }
         queue2Data = queue2Context.wire()
                                   .bytes()
                                   .readObject(Queue2Data.class);
         while(true)
         {
            queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
            if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues
            {                                         // but the second read fails
               // cache a value
               break;
            }
         }

         // continue working with queu2Data object and cached values
      }   // end try block for queue2 tailer

   } // end outer while loop
}   // end outer try block for queue1 tailer

I have tried as above, and also with both Tailers created at the beginning of the function which does the processing (a private function executed when a button is clicked in a relatively simple Java application). Basically I took the loop which worked independently, and put it inside another loop in the function, expecting no problems. I thinking I am missing something crucial in how tailers are positioned and used to read objects, but I cannot figure out what it is - since the same basic code works when reading queues independently. The use of isNull(context.wire()) to determine when there are no more objects in a queue I got from one of the examples, though I am not sure this is the proper way to determine when there are no more objects in a queue when processing the queue sequentially.

Any suggestions would be appreciated.


Solution

  • You're not writing it correctly in the first instance. Now, there's hardcore way of achieving what you are trying to achieve (that is, do everything explicitly, on lower level), and use MethodReader/MethodWriter magic rovided by Chronicle.

    Hardcore way

    Writing

    // write first event type
    try (DocumentContext dc = queueAppender.writingDocument()) {
        dc.wire().writeEventName("first").text("Hello first");
    }
    // write second event type
    try (DocumentContext dc = queueAppender.writingDocument()) {
        dc.wire().writeEventName("second").text("Hello second");
    }
    

    This will write different types of messages into the same queue, and you will be able to easily distinguish those when reading.

    Reading

    StringBuilder reusable = new StringBuilder();
    while (true) {
       try (DocumentContext dc = tailer.readingDocument()) {
           if (!dc.isPresent) {
               continue;
           }
           dc.wire().readEventName(reusable);
           if ("first".contentEquals(reusable)) {
               // handle first
           } else if ("second".contentEquals(reusable)) {
               // handle second
           }
           // optionally handle other events
       }
    }
    

    The Chronicle Way (aka Peter's magic)

    This works with any marshallable types, as well as any primitive types and CharSequence subclasses (i.e. Strings), and Bytes. For more details have a read of MethodReader/MethodWriter documentation.

    Suppose you have some data classes:

    public class FirstDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
        // data fields...
    }
    
    public class SecondDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
        // data fields...
    }
    

    Then, to write those data classes to the queue, you just need to define the interface, like this:

    interface EventHandler {
        void first(FirstDataType first);
        void second(SecondDataType second);
    }
    

    Writing

    Then, writing data is as simple as:

    final EventHandler writer = appender.methodWriterBuilder(EventHandler).get();
    // assuming firstDatum and secondDatum are created earlier
    writer.first(firstDatum);
    writer.second(secondDatum);
    

    What this does is the same as in the hardcore section - it writes event name (which is taken from the method name in method writer, i.e. "first" or "second" correspondingly), and then the actual data object.

    Reading

    Now, to read those events from the queue, you need to provide an implementation of the above interface, that will handle corresponding event types, e.g.:

    // you implement this to read data from the queue
    private class MyEventHandler implements EventHandler {
        public void first(FirstDataType first) {
            // handle first type of events
        }
        public void second(SecondDataType second) {
            // handle second type of events
        }
    }
    

    And then you read as follows:

    EventHandler handler = new MyEventHandler();
    MethodReader reader = tailer.methodReader(handler);
    while (true) {
        reader.readOne(); // readOne returns boolean value which can be used to determine if there's no more data, and pause if appropriate
    }
    
    

    Misc

    You don't have to use the same interface for reading and writing. In case you want to only read events of second type, you can define another interface:

    interface OnlySecond {
        void second(SecondDataType second);
    }
    

    Now, if you create a handler implementing this interface and give it to tailer#methodReader() call, the readOne() calls will only process events of second type while skipping all others.

    This also works for MethodWriters, i.e. if you have several processes writing different types of data and one process consuming all that data, it is not uncommon to define multiple interfaces for writing data and then single interface extending all others for reading, e.g.:

    interface FirstOut {
        void first(String first);
    }
    interface SecondOut {
        void second(long second);
    }
    interface ThirdOut {
        void third(ThirdDataType third);
    }
    interface AllIn extends FirstOut, SecondOut, ThirdOut {
    }
    

    (I deliberately used different data types for method parameters to show how it is possible to use various types)