I am trying out BeamSQL
in shell
and want to test how unbounded sources work in terms of usability and performance. Reading the documentation over here, I created an external table as follows-
CREATE EXTERNAL TABLE pubsub_table (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/test-project/topics/test-topic';
Now when I try to query this table as follows-
SELECT * FROM pubsub_table LIMIT 1;
I get the following error-
java.lang.NoClassDefFoundError: org/apache/beam/sdk/io/gcp/pubsub/PubsubIO$Read
at org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider.buildBeamSqlTable(PubsubJsonTableProvider.java:61)
at org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore.buildBeamSqlTable(InMemoryMetaStore.java:79)
at org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema.getTable(BeamCalciteSchema.java:107)
at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:286)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:994)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:954)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3087)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3069)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3339)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:994)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:954)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:929)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:633)
at org.apache.beam.repackaged.sql.org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:558)
at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:265)
at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:231)
at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepare2_(CalcitePrepareImpl.java:767)
at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepare_(CalcitePrepareImpl.java:631)
at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepareSql(CalcitePrepareImpl.java:601)
at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteConnectionImpl.parseQuery(CalciteConnectionImpl.java:229)
at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteMetaImpl.prepareAndExecute(CalciteMetaImpl.java:550)
at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaConnection.prepareAndExecuteInternal(AvaticaConnection.java:675)
at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:156)
at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaStatement.execute(AvaticaStatement.java:217)
at sqlline.Commands.execute(Commands.java:823)
at sqlline.Commands.sql(Commands.java:733)
at sqlline.SqlLine.dispatch(SqlLine.java:795)
at sqlline.SqlLine.begin(SqlLine.java:668)
at org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLine.runSqlLine(BeamSqlLine.java:75)
at org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLine.main(BeamSqlLine.java:39)
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 40 more
Any help would be appreciated.
Looks like you don't have the PubsubIO available at the runtime. The shell by default doesn't include any extra IOs (or runners), you have to explicitly build and have all such extra stuff on the classpath to be able to use it. It should be sufficient to specify the required SDK modules in the command line arg -Pbeam.sql.shell.bundled
when building the shell.
For example, this command builds and installs the shell bundled with the Flink Runner, Kafka IO and Google Cloud IOs:
./gradlew -p sdks/java/extensions/sql/shell \
-Pbeam.sql.shell.bundled=':runners:flink:1.5,:sdks:java:io:kafka,:sdks:java:io:google-cloud-platform' \
installDist
Then you run it:
./sdks/java/extensions/sql/shell/build/install/beam-sdks-java-extensions-sql-shell/bin/beam-sdks-java-extensions-sql-shell
Some details are here: https://beam.apache.org/documentation/dsls/sql/shell/