espersubscriberepl

Attaching a subscriber/listener within EPL module to a statement with context partitions


I have the following EPL module which successfully deploys:

module context;

import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;

@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);

@Name('createSchemaEvent')
create schema TickEvent as TickEvent; 

@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;


@Name('compareStocks') 
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,     
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and         
B.stockCode =  context.initEvent.secondStock
);

I have a problem with the listeners/subscribers. According to my checks and debugging, the classes don't have any problems, the annotations work, they are attached to the statement upon deployment, and yet neither of them receive any updates from the events.

This is my subscriber, I simply want to print that it has been received:

package subscribers;
import java.util.Map;

public class MySubscriber {

public void update(Map row) {
    System.out.println("got it");
    }
}

I previously had the same module without any context partitions and then the subscribers worked without a problem. After I added the context, it stopped.

So far I have tried:

  1. Checking if the statement has any subscriber/listener attached (it does)
  2. Checking their names
  3. Remove the annotations and set them manually within Java code after deployment (same thing - they attach, I can retrieve their name but still don't receive updates)
  4. Debugging the subscriber class. The program either doesn't go there at all to stop at a break point or I get an error (missing line number attribute error - ("can't place a break point there" which I tried to fix to no avail)

Any idea what could cause this or what is the best way to set a subscriber to a statement which has context partitions?

This is a continuation of a previous problem which was solved here - Creating instances of Esper's epl

EDIT: Events being sent in the format I use them and in the EPL online tool format:

I first get the pair to be followed from the user:

    System.out.println("First stock:"); 
    String first = scanner.nextLine();
    System.out.println("Second stock:"); 
    String second = scanner.nextLine();
    System.out.println("Difference:"); 
    double diff= scanner.nextDouble();
    InitEvent init = new InitEvent(first, second, diff);

After that I have an engine thread the continuously sends events, but before it starts InitEvents is sent as such:

@Override
public void run() {

    runtime.sendEvent(initEvent);   

    while (contSimulation) {

        TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
        runtime.sendEvent(tick1);

        TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
        runtime.sendEvent(tick2);

        TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
        runtime.sendEvent(tick3);

        TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
        runtime.sendEvent(tick4);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();

    }

} 

I haven't used the online tool before but I think I got it working. This is the module text:

module context; 

create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);

create context TwoStocksContext
initiated by InitEvent as initEvent;

context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice, 
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and 
B.stockCode =  context.initEvent.secondStock
);

And the sequence of events:

InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}

TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}

The result I get using them:

At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH', 
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH', 
b_stockCode='GOO'}

If I make the second set of events having a difference less than 5 between YAH/GOO, I only get output from the first pair which makes sense. This is, I think what it is supposed to do.

In case needed, those two methods read and process the annotations of the EPL module (I didn't write them myself, they are taken from coinTrader Context class that could be found here - https://github.com/timolson/cointrader/blob/master/src/main/java/org/cryptocoinpartners/module/Context.java ):

private static Object getSubscriber(String className) throws Exception {

    Class<?> cl = Class.forName(className);
    return cl.newInstance();
}

private static void processAnnotations(EPStatement statement) throws Exception {

    Annotation[] annotations = statement.getAnnotations();
    for (Annotation annotation : annotations) {
        if (annotation instanceof Subscriber) {

            Subscriber subscriber = (Subscriber) annotation;
            Object obj = getSubscriber(subscriber.className());
            System.out.println(subscriber.className());
            statement.setSubscriber(obj);

        } else if (annotation instanceof Listeners) {

            Listeners listeners = (Listeners) annotation;
            for (String className : listeners.classNames()) {
                Class<?> cl = Class.forName(className);
                Object obj = cl.newInstance();
                if (obj instanceof StatementAwareUpdateListener) {
                    statement.addListener((StatementAwareUpdateListener) obj);
                } else {
                    statement.addListener((UpdateListener) obj);
                }
            }


        }
    }
}

Solution

  • Well, after a month of struggle I finally solved it. In case anyone has similar problem in the future, here's where the problem was. The epl worked fine in the online tool but not in my code. Eventually, I figured out initial events aren't firing, hence context partitions aren't being created and as a result the subscribers and listeners do not receive any updates. My mistake was that I had POJO InitEvent fired, but the event that the context was using was created within the EPL module via create schema. I don't know what I was thinking, it makes sense now that it didn't work. As a result, the events I fire within Java aren't the events that the context uses. My solution was only within the EPL. Since I couldn't figure out if I can fire events in Java that are created within the module, I created a schema which is populated by my POJO and the stream is then used by the context as such:

    @Name('schemaCreator')
    create schema StartEvent(firstStock string, secondStock string, difference 
    double);
    
    @Name('insertInitEvent')
    insert into StartEvent 
    select * from InitEvent; 
    

    All else remains the same, as well as the Java code.