apache-beamgoogle-cloud-pubsubbeam-sql

How to add google cloud pubsub as a source in Beam SQL shell?


I am trying out BeamSQL in shell and want to test how unbounded sources work in terms of usability and performance. Reading the documentation over here, I created an external table as follows-

CREATE EXTERNAL TABLE pubsub_table (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/test-project/topics/test-topic';

Now when I try to query this table as follows-

SELECT * FROM pubsub_table LIMIT 1;

I get the following error-

java.lang.NoClassDefFoundError: org/apache/beam/sdk/io/gcp/pubsub/PubsubIO$Read
    at org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider.buildBeamSqlTable(PubsubJsonTableProvider.java:61)
    at org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore.buildBeamSqlTable(InMemoryMetaStore.java:79)
    at org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema.getTable(BeamCalciteSchema.java:107)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:286)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:994)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:954)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3087)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3069)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3339)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:994)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:954)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:929)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:633)
    at org.apache.beam.repackaged.sql.org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:558)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:265)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.Prepare.prepareSql(Prepare.java:231)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepare2_(CalcitePrepareImpl.java:767)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepare_(CalcitePrepareImpl.java:631)
    at org.apache.beam.repackaged.sql.org.apache.calcite.prepare.CalcitePrepareImpl.prepareSql(CalcitePrepareImpl.java:601)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteConnectionImpl.parseQuery(CalciteConnectionImpl.java:229)
    at org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteMetaImpl.prepareAndExecute(CalciteMetaImpl.java:550)
    at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaConnection.prepareAndExecuteInternal(AvaticaConnection.java:675)
    at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:156)
    at org.apache.beam.repackaged.sql.org.apache.calcite.avatica.AvaticaStatement.execute(AvaticaStatement.java:217)
    at sqlline.Commands.execute(Commands.java:823)
    at sqlline.Commands.sql(Commands.java:733)
    at sqlline.SqlLine.dispatch(SqlLine.java:795)
    at sqlline.SqlLine.begin(SqlLine.java:668)
    at org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLine.runSqlLine(BeamSqlLine.java:75)
    at org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLine.main(BeamSqlLine.java:39)

Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 40 more

Any help would be appreciated.


Solution

  • Looks like you don't have the PubsubIO available at the runtime. The shell by default doesn't include any extra IOs (or runners), you have to explicitly build and have all such extra stuff on the classpath to be able to use it. It should be sufficient to specify the required SDK modules in the command line arg -Pbeam.sql.shell.bundled when building the shell.

    For example, this command builds and installs the shell bundled with the Flink Runner, Kafka IO and Google Cloud IOs:

    ./gradlew -p sdks/java/extensions/sql/shell \
        -Pbeam.sql.shell.bundled=':runners:flink:1.5,:sdks:java:io:kafka,:sdks:java:io:google-cloud-platform' \
        installDist
    

    Then you run it:

    ./sdks/java/extensions/sql/shell/build/install/beam-sdks-java-extensions-sql-shell/bin/beam-sdks-java-extensions-sql-shell
    

    Some details are here: https://beam.apache.org/documentation/dsls/sql/shell/