apache-kafkaapache-flinkflink-table-api

Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory'


I have a Flink job that runs well locally but fails when I try to flink run the job on cluster. The error happens when trying to load data from Kafka via 'connector' = 'kafka'. I am using Flink-Table API and confluent-avro format for reading data from Kafka.

So basically i created a table which reads data from kafka topic:

    val inputTableSQL =
      s"""CREATE TABLE input_table (
         |  -- key of the topic
         |  key BYTES NOT NULL,
         |
         |  -- a few columns mapped to the Avro fields of the Kafka value
         |  id STRING,
         |
         |) WITH (
         |
         |  'connector' = 'kafka',
         |  'topic' = '${KafkaConfiguration.InputTopicName}',
         |  'scan.startup.mode' = 'latest-offset',
         |
         |  -- UTF-8 string as Kafka keys, using the 'key' table column
         |  'key.format' = 'raw',
         |  'key.fields' = 'key',
         |
         |  'value.format' = 'avro-confluent',
         |  'value.avro-confluent.schema-registry.url' = '${KafkaConfiguration.KafkaConsumerSchemaRegistryUrl}',
         |  'value.fields-include' = 'EXCEPT_KEY'
         |)
         |""".stripMargin
    val inputTable = tableEnv.executeSql(inputTableSQL)

and then i created another table, which i will use as output table:

val outputTableSQL =
      s"""CREATE TABLE custom_avro_output_table (
         |  -- key of the topic
         |  key BYTES NOT NULL,
         |
         |  -- a few columns mapped to the Avro fields of the Kafka value
         |  ID STRING
         |) WITH (
         |
         |  'connector' = 'kafka',
         |  'topic' = '${KafkaConfiguration.OutputTopicName}',
         |  'properties.bootstrap.servers' = '${KafkaConfiguration.KafkaProducerBootstrapServers}',
         |
         |  -- UTF-8 string as Kafka keys, using the 'key' table column
         |  'key.format' = 'raw',
         |  'key.fields' = 'key',
         |
         |  $outputFormatSettings
         |  'value.fields-include' = 'EXCEPT_KEY'
         |)
         |""".stripMargin

    val outputTableCreationResult = tableEnv.executeSql(outputTableSQL)
    
val customInsertSQL =
      """INSERT INTO custom_avro_output_table
        |SELECT key, id
        |  FROM input_table
        | WHERE userAgent LIKE '%ost%'
        |""".stripMargin

    val customInsertResult = tableEnv.executeSql(customInsertSQL)

when i run this in local machine, everything works fine, but when i run it in cluster, it crashes.

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_282]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_282]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_282]
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.1.jar:1.13.1]
    ... 13 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.

Available factory identifiers are:

canal-json
csv
debezium-json
json
maxwell-json
raw
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) ~[flink-table_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:751) ~[flink-table_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:649) ~[flink-table_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:633) ~[flink-table_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.lambda$getValueDecodingFormat$2(KafkaDynamicTableFactory.java:279) ~[?:?]
    at java.util.Optional.orElseGet(Optional.java:267) ~[?:1.8.0_282]
    at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.getValueDecodingFormat(KafkaDynamicTableFactory.java:277) ~[?:?]
    at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:142) ~[?:?]
    at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ~[flink-table_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[flink-table_2.12-1.13.1.jar:1.13.1]

following is my build.sbt:

val flinkVersion = "1.13.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
  "org.apache.flink" %% "flink-clients" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-table-planner-blink"  % flinkVersion % Provided,
  "org.apache.flink" % "flink-table-common"  % flinkVersion % Provided,
  "org.apache.flink" % "flink-avro-confluent-registry" % flinkVersion,
  "org.apache.flink" % "flink-json" % flinkVersion,
  "com.webtrekk" % "wd.generated" % "2.2.3",
  "com.webtrekk" % "wd.generated.public" % "2.2.0",
  "ch.qos.logback" % "logback-classic" % "1.2.3"
)

Similar issue has been posted in Flink 1.12 Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath but the solution of adding provided is not working in my case.


Solution

  • I was able to fix this problem using following approach:

    In my build.sbt, there was the following mergeStrategy:

    lazy val mergeStrategy = Seq(
      assembly / assemblyMergeStrategy := {
        case "application.conf" => MergeStrategy.concat
        case "reference.conf" => MergeStrategy.concat
        case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
        case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
        case _ => MergeStrategy.first
      }
    )
    

    I appended the following chunk in it, hence resolved my exception:

    case "META-INF/services/org.apache.flink.table.factories.Factory"  => MergeStrategy.concat
    case "META-INF/services/org.apache.flink.table.factories.TableFactory"  => MergeStrategy.concat