jboss-toolsdebezium

How to use custom payload column with outbox transform on standalone Debezium?


I'm trying to run a standalone Debezium with the outbox SMT using a custom payload column(after) and an additional jsonb column(before), but the task is throwing the error:

debezium           | 2019-05-21 23:07:50,267 ERROR  ||  WorkerSourceTask{id=campaigns-outbox-connector-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
debezium           | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
debezium           |    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
debezium           |    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
debezium           |    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
debezium           |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
debezium           |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
debezium           |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
debezium           |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
debezium           |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
debezium           |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
debezium           |    at java.lang.Thread.run(Thread.java:748)
debezium           | Caused by: org.apache.kafka.connect.errors.DataException: payload is not a valid field name
debezium           |    at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
debezium           |    at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
debezium           |    at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:98)
debezium           |    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
debezium           |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
debezium           |    ... 11 more

It looks like to me that debezium is trying to get the column payload from my table campaigns.outbox, eventhough I'm explicitily overriding the transforms payload column in my worker parameters(./config/connect-campaigns-outbox.properties):

name=campaigns-outbox-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=postgres
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=campaigns
database.server.name=campaigns_api
table.whitelist=campaigns.outbox
transforms=outbox

transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

transforms.outbox.table.field.event.key=aggregate_id
transforms.outbox.table.field.event.type=type
transforms.outbox.table.field.event.payload.id=aggregate_id
transforms.outbox.table.field.event.payload=after
transforms.outbox.table.fields.additional.placement=before:envelope

transforms.outbox.route.topic.replacement=media-platform.campaigns-api.${routedByValue}
transforms.outbox.route.by.field=type

My ./docker-compose.yaml:

services:
  debezium:
    container_name: debezium
    image: debezium/connect:0.9
    ports:
    - 8082:8082
    volumes:
    - ./config:/kafka/config # can't use $KAFKA_HOME here
    - ./offsets:/offsets
    command:
    - sh
    - -c
    - $$KAFKA_HOME/bin/connect-standalone.sh $$KAFKA_HOME/config/connect-standalone.properties $$KAFKA_HOME/config/connect-campaigns-outbox.properties
  postgres:
    container_name: postgres
    image: 'debezium/postgres:10-alpine'
    ports:
      - 5432:5432
    environment:
      - POSTGRES_DB=campaigns
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 3s
      retries: 7
  zookeeper:
    container_name: zookeeper
    hostname: zookeeper
    image: 'confluentinc/cp-zookeeper:3.1.1'
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
  kafka:
    container_name: kafka
    hostname: kafka
    image: 'confluentinc/cp-kafka:3.1.1'
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
  schema-registry:
    container_name: schema-registry
    hostname: schema-registry
    image: 'confluentinc/cp-schema-registry:3.1.1'
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    depends_on:
      - zookeeper
    ports:
      - "8081:8081"

My ./config/connect-standalone.properties:

bootstrap.servers=kafka:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter.schema.registry.url=http://schema-registry:8081
offset.storage.file.filename=/offsets/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/kafka/connect

The minimal PG schema required to reproduce the error:

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TYPE campaigns.outbox_event_type AS ENUM (
    'campaign',
    'creative'
);
CREATE TABLE campaigns.outbox (
    id UUID PRIMARY KEY,
    type campaigns.outbox_event_type NOT NULL,
    aggregate_id TEXT NOT NULL,
    before JSONB,
    after JSONB
);

To reproduce the error:

insert into campaigns.outbox (id, type, aggregate_id, before, after) values (uuid_generate_v4(), 'campaign', '1', NULL, '{"id":1,"title":"teste","description":"teste description"}');

Am I doing something wrong or should I report this as an issue to the Jboss team?


Solution

  • The option name for the payload column should be transforms.outbox.table.field.payloadinstead of transforms.outbox.table.field.event.payload (see the definition of the options in EventRouterConfigDefinition.

    I see it's indicated differently in the docs and I think the code should should be adjusted accordingly. I'll take care of it.