chroniclechronicle-queue

First excerpt of every next roll cycle file is being read as part of previous cycle


This is related to the previous question I have posted. I think that while it is related, it might be different enough to warrant its own question.

The code used is:

public static void main(String[] args){
        ChronicleQueue QUEUE = SingleChronicleQueueBuilder.single("./chronicle/roll")
                                .rollCycle(RollCycles.MINUTELY).build();
        ExcerptTailer TAILER = QUEUE.createTailer();

        ArrayList<Long> seqNums = new ArrayList<>();

        //this reads all roll cycles starting from first and carries on to next rollcycle.
        //busy spinner that spins non-stop trying to read from queue
        int currentCycle = TAILER.cycle();
        System.out.println(TAILER.cycle());
        while(true){
            //if it moves over to new cycle, start over the sequencing (fresh start for next day)
            int cycleCheck = TAILER.cycle();
            long indexCheck = TAILER.index();
            System.out.println(cycleCheck);
            System.out.println("idx: "+indexCheck);
            if (currentCycle != cycleCheck){
                LOGGER.warn("Changing to new roll cycle, from: "+currentCycle+" to: "+cycleCheck+". Clearing list of size "+seqNums.size());
                seqNums.clear();  // this may cause a memory issue see: https://stackoverflow.com/a/6961397/16034206
                currentCycle = cycleCheck;
                TAILER.moveToCycle(currentCycle);
                cycleCheck = TAILER.cycle();
                indexCheck = TAILER.index();
                System.out.println("cycle: "+cycleCheck);
                System.out.println("idx: "+indexCheck);
            }
            //TODO:2nd option, on starting the chronicle runner, always move to end, and wait for next day's cycle to start
            if (TAILER.readDocument(w -> w.read("packet").marshallable(
                    m -> {
                        long seqNum = m.read("seqNum").readLong();
                        int size = seqNums.size();
                        if (size > 0){
                            int idx;
                            if ((idx = seqNums.indexOf(seqNum)) >= 0){
                                LOGGER.warn("Duplicate seqNum: "+seqNum+" at idx: "+idx);
                            }else{
                                long previous = seqNums.get(size-1);
                                long gap = seqNum - previous;
                                if (Math.abs(gap) > 1L){
                                    LOGGER.error("sequence gap at seqNum: "+previous+" and "+seqNum+"! Gap of "+gap);
                                }
                            }
                        }
                        seqNums.add(seqNum);
                        System.out.println(m.read("moldUdpHeader").text());
                    }
            ))){ ; }else { TAILER.close(); break; }
            //breaks out from spinner if nothing to be read.
            //a named tailer could be used to pick up from where is left off.
        }
    }

At this point, I have 2 roll cycle files, one ends in a sequence Number of 1001, then the next file starts with seqNum of 0. Using the while loop, it would read both files, but there is an if statement to check that the cycle has changed or not and reset accordingly.

The output is as follows:

output

The output when .moveToCycle() is commented:

enter image description here

As you can see, the first index of the next file is read as part of previous file, but when I use the TAILER.moveToCycle(currentCycle) it moves to start of the next file again, but it has a different index this time. If you comment this line of code out, it will not re-read the entry with seqNum of 0.


Solution

  • Alright, I tested the following and it works just fine. How it works is that it reads the value (I am assuming the internal workings would only shift the index and cycle after it reads an incoming value), then tests for cycle change (from testing before reading to testing after reading). This is probably how one should iterate over multiple roll cycle files, while keeping track of when it roll overs.

    Also, note that previously it prints cycle and index before printing the object, now it prints object before printing cycle and index, so its likely that you may misread it and assume it doesn't work if you try to test the following code.

        public static void main(String[] args){
            ChronicleQueue QUEUE = SingleChronicleQueueBuilder.single("./chronicle/roll")
                                    .rollCycle(RollCycles.FIVE_MINUTELY).build();
            ExcerptTailer TAILER = QUEUE.createTailer();
            ArrayList<Long> seqNums = new ArrayList<>();
    
            //this reads all roll cycles starting from first and carries on to next roll cycle.
            //busy spinner that spins non-stop trying to read from queue
            int currentCycle = TAILER.cycle();
            System.out.println(TAILER.cycle());
            AtomicLong seqNum = new AtomicLong();
            while(true){
                if (TAILER.readDocument(w -> w.read("packet").marshallable(
                        m -> {
                            long val = m.read("seqNum").readLong();
                            seqNum.set(val);
                            System.out.println(m.read("moldUdpHeader").text());
                        }
                ))){
                    //if it moves over to new cycle, start over the sequencing (fresh start for next day)
                    int cycleCheck = TAILER.cycle();
                    long indexCheck = TAILER.index();
                    System.out.println("cycle: "+cycleCheck);
                    System.out.println("idx: "+indexCheck);
                    if (currentCycle != cycleCheck){
                        LOGGER.warn("Changing to new roll cycle, from: "+currentCycle+" to: "+cycleCheck+". Clearing list of size "+seqNums.size());
                        seqNums.clear();  // this may cause a memory issue see: https://stackoverflow.com/a/6961397/16034206
                        currentCycle = cycleCheck;
                    }
    
                    int size = seqNums.size();
                    long val = seqNum.get();
                    if (size > 0){
                        int idx;
                        if ((idx = seqNums.indexOf(seqNum)) >= 0){
                            LOGGER.warn("Duplicate seqNum: "+seqNum+" at idx: "+idx);
                        }else{
                            long previous = seqNums.get(size-1);
                            long gap = val - previous;
                            if (Math.abs(gap) > 1L){
                                LOGGER.error("sequence gap at seqNum: "+previous+" and "+seqNum+"! Gap of "+gap);
                            }
                        }
                    }
                    seqNums.add(val);
                } else { TAILER.close(); break; }
                //breaks out from spinner if nothing to be read.
                //a named tailer could be used to pick up from where is left off.
            }
        }