apache-flinkflink-streamingflink-sql

Flink: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath


I am trying to connect Kafka to Flink and run via sql-client.sh. However, no matter what I do with .yaml and libraries, I keep getting the error:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create catalog 'myKafka'.

Catalog options are:
'type'='kafka'
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:270)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.createCatalog(LegacyTableEnvironmentInitializer.java:217)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.lambda$initializeCatalogs$1(LegacyTableEnvironmentInitializer.java:120)
        at java.util.HashMap.forEach(HashMap.java:1289)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeCatalogs(LegacyTableEnvironmentInitializer.java:117)
        at org.apache.flink.table.client.gateway.context.LegacyTableEnvironmentInitializer.initializeSessionState(LegacyTableEnvironmentInitializer.java:105)
        at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:233)
        at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:100)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:91)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
        at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
        ... 1 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.CatalogFactory' in the classpath.

Available factory identifiers are:

generic_in_memory
        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
        at org.apache.flink.table.factories.FactoryUtil.getCatalogFactory(FactoryUtil.java:455)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:251)
        ... 11 more

My sql-conf is pretty simple (I didn't include sensitive information such as bootstrap servers):

catalogs:
 - name: myKafka
   type: kafka

In addition, the library folder includes the following jars:

The Flink version: 1.13.2. The Kafka version: 2.0.0-cdh6.1.1.

Solution (thanks to @Niko for pointing me in the right direction): I modified the sql-conf.yaml to use hive catalog and created Kafka table inside of the SQL. So, my sql-conf.yaml looks like:

execution:
  type: streaming
  result-mode: table
  planner: blink
  current-database: default
  current-catalog: myhive

catalogs:
  - name: myhive
    type: hive
    hive-version: 2.1.1-cdh6.0.1
    hive-conf-dir: /etc/hive/conf
  
deployment:
  m: yarn-cluster
  yqu: ABC_XYZ

ran it and inside of SQL-client.sh, created Kafka table using necessary connections.


Solution

  • All catalogs defined using YAML must provide a type property that specifies the type of catalog. The following types are supported out of the box:

    You can read more about in official doc

    You can create your so-called initialization SQL file like:

    CREATE CATALOG MyCatalog WITH (
        'type' = 'hive',
        'default-database' = 'my_database',
        'hive-conf-dir' = '/etc/hive/conf'
      );
    
    USE CATALOG MyCatalog;
    
    CREATE TABLE MyTable(
      MyField1 INT,
      MyField2 STRING
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'YOUR_TOPIC',
      'properties.bootstrap.servers' = 'localhost',
      'properties.group.id' = 'some_id',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'json'
    )