esper

How to clean up EPStatement#iterator() whose underlying statement is "INSERT INTO"


I'm investigating an issue with Esper 5.5.0. In the code base which I'm working on, an "INSERT INTO" statement is used and it pulls out data with EPStatement#iterator() from the "INSERT INTO" statement. It does return a non-empty Iterator (which looks weird to me though).

The issue is that the Iterator keeps accumulating data and never gets cleaned up. I'm trying to find a way to clean up the data in the Iterator but I don't know how I can do that. Its remove() method throws an Exception and deleting data from the derived window doesn't have any effect on the EPStatement object which corresponds to the "INSERT INTO" statement. How can I clean up the data in the Iterator which corresponds to the "INSERT INTO" statement? (EDIT: Not the one corresponds to the derived window, the one for the "INSERT INTO" statement itself)

Unfortunately I'm even unable to create a simple reproducer. They do something like the following but the Iterator is always empty when I try to replicate that behavior in new code. I would also like to know what is missing to replicate the behavior.

public class MyTest {
    @Test
    void eplStatementReturningNonEmptyIterator() {
        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider();
        EPRuntime rt = engine.getEPRuntime();
        EPAdministrator adm = engine.getEPAdministrator();
        adm.getConfiguration().addEventType(PersonEvent.class);
        adm.createEPL("create window PersonEventWindow.win:keepall() as PersonEvent");
        EPStatement epl = adm.createEPL("insert into PersonEventWindow select irstream * from PersonEvent");

        rt.sendEvent(new PersonEvent("foo", 1));
        rt.sendEvent(new PersonEvent("bar", 2));

        // passes, but this question is not about this one
        assert count(rt.executeQuery("select * from PersonEventWindow").iterator()) == 2;
       
        // This question is about this one, I want to clean up the Iterator which epl.iterator() returns
        assert count(epl.iterator()) == 2; 

        // (this assertion ^ fails actually; I cannot even replicate the issue)
    }

    private static int count(Iterator<?> it) {
        int count = 0;
        while (it.hasNext()) {
            it.next();
            count++;
        }
        return count;
    }

    public static class PersonEvent {
        private String name;
        private int age;
        public PersonEvent(String name, int age) {
            this.name = name;
            this.age = age;
        }
        public String getName() {
            return name;
        }
        public int getAge() {
            return age;
        }
    }
}

Solution

  • It turned out that the Iterator for "insert into..." returns elements if it selects from a window. In order to clean up the Iterator, we can delete data from the window which the "insert into" query selects data from.

    The following code verifies my explanation I believe:

    public class MyTest3 {
        EPServiceProvider engine;
        EPAdministrator epa;
        EPRuntime epr;
    
        @BeforeEach
        void setUp() {
            engine = EPServiceProviderManager.getDefaultProvider();
            epa = engine.getEPAdministrator();
            epr = engine.getEPRuntime();
        }
    
        @Test
        @DisplayName("The Iterator gets cleaned up by delete from MyWindow")
        void cleanUpIterator() {
            epa.getConfiguration().addEventType(MyEvent.class);
            epa.createEPL("create window MyWindow.std:unique(id) as MyEvent");
            epa.createEPL("insert into   MyWindow select id from    MyEvent");
            epr.sendEvent(new MyEvent(1));
            epr.sendEvent(new MyEvent(2));
            EPStatement insertIntoAnotherWindow = epa.createEPL("insert into AnotherWindow select id from MyWindow");
            assertThat(count(insertIntoAnotherWindow.iterator())).isEqualTo(2); // this returns the events surprisingly
    
            epr.executeQuery("delete from MyWindow");
    
            assertThat(count(insertIntoAnotherWindow.iterator())).isEqualTo(0); // now it's empty
        }
    
        public static class MyEvent {
            private final int id;
    
            public MyEvent(int id) {
                this.id = id;
            }
    
            public int getId() {
                return id;
            }
        }
    
        @AfterEach
        void tearDown() {
            engine.destroy();
        }
    
        private static int count(Iterator<?> it) {
            int count = 0;
            while (it.hasNext()) {
                it.next();
                count++;
            }
            return count;
        }
    }