javasqlstreamingapache-calcite

Join two infinite streams in calcite SQL


I have created two infinite test streams in java. Both delivers random data. Now I want to join these datasets und receive a result on every new arriving data.

The result is: Stream A is processed until it ends. Then stream B is processed until it ends. I have to conclusions:

  1. If stream A is infinite I will never get any results because stream B is ignored and it keeps reading A until Out of memory
  2. Later arriving data in stream A will not be processed after the reading of A was completed (if A is finite)

So I have 2 questions: Am I doing something completly wrong or is calcite designed this way? If I am doing wrong, how can I achieve a parallel reading and processing of two infinite streams?

I have attached the source of my main class, the 2 stream-generating classes and the model.json.

public static void main(String[] args) throws Exception {
    Class.forName("org.apache.calcite.jdbc.Driver");
    Properties info = new Properties();
    info.setProperty("lex", "JAVA");
    Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + "c:\\calcite\\model.json", info);
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        Statement statement = connection.createStatement();
    ResultSet resultSet = 
            statement.executeQuery("select stream * from S.B b full outer join S.A a on a.ID = b.PRODUCT_ID");

    System.out.println("statement executed");
        final StringBuilder buf = new StringBuilder();
    
    while (resultSet.next()) {
        int n = resultSet.getMetaData().getColumnCount();
        for (int i = 1; i <= n; i++) {
            buf.append(i > 1 ? "; " : "").append(resultSet.getMetaData().getColumnLabel(i)).append("=")
                    .append(resultSet.getObject(i));
        }
        System.out.println(buf.toString());
        buf.setLength(0);
    }
}





public abstract class Stream implements StreamableTable, ScannableTable{

public Schema.TableType getJdbcTableType() {
    return Schema.TableType.STREAM;
}

@Override
public boolean isRolledUp(String column) {
    return false;
}

@Override
public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
        CalciteConnectionConfig config) {
    return false;
}
public Table stream() {
    return this;
}
public Statistic getStatistic() {
    return Statistics.of(100d, ImmutableList.of());

}
}


public class StreamA extends Stream {
   public static  boolean wait = false;
   public Enumerable<Object[]> scan(DataContext root) {
    return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
        private final String[] items = { "paint1", "paper2", "brush3" };
        private int counter = 0;
        
        public boolean hasNext() {
            return true;
        }

        public Object[] next() {
            System.out.println("next A");
            
            final int index = counter++;
            
            return new Object[] { System.currentTimeMillis(), index , items[index % items.length]};
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
        

        
    });
    
}



protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
        .add("ID", SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10)
        .build();

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return protoRowType.apply(typeFactory);
}
}





public class StreamB extends Stream {
   public static  boolean wait = false;
   public Enumerable<Object[]> scan(DataContext root) {
    return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
    
        private int counter = 0;
        
        public boolean hasNext() {
            return true;
        }

        public Object[] next() {
            System.out.println("next B");
            
            final int index = counter++;
            return new Object[] { System.currentTimeMillis(), index, (Math.abs((new Random()).nextInt())) % 4 +1, "Kauf "+index };
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
        

    });
}



protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
        .add("ID", SqlTypeName.INTEGER).add("PRODUCT_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 15)
        .build();

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return protoRowType.apply(typeFactory);
}
}



{
 "version": "1.0",
 "defaultSchema": "dummy",
 "schemas": [
{
  "name": "S",
  "tables": [ {
    "type": "custom",
    "name": "A",
    "stream": {
      "stream": true
    },
    "factory": "de.mypackage.calcite.StreamFactory$FactoryA"
  },
{
    "type": "custom",
    "name": "B",
    "stream": {
      "stream": true
    },
    "factory": "de.mypackage.calcite.StreamFactory$FactoryB"
      
      }
      ]
}
  ]
}

Let's say: Isn't possible. How can I use two infinite datastreams in windowed joins? in the calite doumentation I only found the rowtime as join condition, but this won't work, too


Solution

  • Calcite does not currently support stream-to-stream joins. You can find more information in the documentation on streaming. I don't believe there has been recent progress, but you can follow the status of stream join support in Calcite's issue tracker.

    I'll add a few notes on how it's planned to work. It really only makes sense if you're doing so within a particular window. For this to work, you need to make use of a monotonic column in each stream such as rowtime so there can be a guarantee that progress can be made in the query.

    Here's one example joining two streams of orders and shipments from the documentation:

    SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
    FROM Orders AS o
    JOIN Shipments AS s
      ON o.orderId = s.orderId
      AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
    

    In this case, you can see there is a constraint that the shipment must occur within one hour of the order. In a case like this, Calcite would be able to make forward process in the query since Orders.rowtime and Shipments.rowtime are both monotonic.