I will input data of multiple tables to kafka, and beam will execute SQL after getting the data, but now there are the following errors:
Exception in thread "main"
java.lang.IllegalStateException: Cannot call getSchema when there is no schema at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328) at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.(BeamPCollectionTable.java:34) at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:141) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:102) at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:82) at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539) at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:248) at BeamSqlTest.main(BeamSqlTest.java:65)
Is there a feasible solution? Please help me!
I think you need to set schema for your input collection PCollection<Row> apply
with setRowSchema()
or setSchema()
. The problem is that your schema is dynamic and it's defined in runtime (not sure if Beam supports this). Could you have static schema and define it before starting processing input data?
Also, since your input source is unbounded, you need to define windows to apply SqlTransform
after.