I have created two infinite test streams in java. Both delivers random data. Now I want to join these datasets und receive a result on every new arriving data.
The result is: Stream A is processed until it ends. Then stream B is processed until it ends. I have to conclusions:
So I have 2 questions: Am I doing something completly wrong or is calcite designed this way? If I am doing wrong, how can I achieve a parallel reading and processing of two infinite streams?
I have attached the source of my main class, the 2 stream-generating classes and the model.json.
public static void main(String[] args) throws Exception {
Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + "c:\\calcite\\model.json", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery("select stream * from S.B b full outer join S.A a on a.ID = b.PRODUCT_ID");
System.out.println("statement executed");
final StringBuilder buf = new StringBuilder();
while (resultSet.next()) {
int n = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= n; i++) {
buf.append(i > 1 ? "; " : "").append(resultSet.getMetaData().getColumnLabel(i)).append("=")
.append(resultSet.getObject(i));
}
System.out.println(buf.toString());
buf.setLength(0);
}
}
public abstract class Stream implements StreamableTable, ScannableTable{
public Schema.TableType getJdbcTableType() {
return Schema.TableType.STREAM;
}
@Override
public boolean isRolledUp(String column) {
return false;
}
@Override
public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
CalciteConnectionConfig config) {
return false;
}
public Table stream() {
return this;
}
public Statistic getStatistic() {
return Statistics.of(100d, ImmutableList.of());
}
}
public class StreamA extends Stream {
public static boolean wait = false;
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
private final String[] items = { "paint1", "paper2", "brush3" };
private int counter = 0;
public boolean hasNext() {
return true;
}
public Object[] next() {
System.out.println("next A");
final int index = counter++;
return new Object[] { System.currentTimeMillis(), index , items[index % items.length]};
}
public void remove() {
throw new UnsupportedOperationException();
}
});
}
protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
.add("ID", SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10)
.build();
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
}
public class StreamB extends Stream {
public static boolean wait = false;
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
private int counter = 0;
public boolean hasNext() {
return true;
}
public Object[] next() {
System.out.println("next B");
final int index = counter++;
return new Object[] { System.currentTimeMillis(), index, (Math.abs((new Random()).nextInt())) % 4 +1, "Kauf "+index };
}
public void remove() {
throw new UnsupportedOperationException();
}
});
}
protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
.add("ID", SqlTypeName.INTEGER).add("PRODUCT_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 15)
.build();
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
}
{
"version": "1.0",
"defaultSchema": "dummy",
"schemas": [
{
"name": "S",
"tables": [ {
"type": "custom",
"name": "A",
"stream": {
"stream": true
},
"factory": "de.mypackage.calcite.StreamFactory$FactoryA"
},
{
"type": "custom",
"name": "B",
"stream": {
"stream": true
},
"factory": "de.mypackage.calcite.StreamFactory$FactoryB"
}
]
}
]
}
Let's say: Isn't possible. How can I use two infinite datastreams in windowed joins? in the calite doumentation I only found the rowtime as join condition, but this won't work, too
Calcite does not currently support stream-to-stream joins. You can find more information in the documentation on streaming. I don't believe there has been recent progress, but you can follow the status of stream join support in Calcite's issue tracker.
I'll add a few notes on how it's planned to work. It really only makes sense if you're doing so within a particular window. For this to work, you need to make use of a monotonic column in each stream such as rowtime
so there can be a guarantee that progress can be made in the query.
Here's one example joining two streams of orders and shipments from the documentation:
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
In this case, you can see there is a constraint that the shipment must occur within one hour of the order. In a case like this, Calcite would be able to make forward process in the query since Orders.rowtime
and Shipments.rowtime
are both monotonic.