postgresqldebeziumredis-streams

Debezium is not working with redis streams and postgresql


I'm having an issue with running debezium with redis and postgresql.

My docker compose is:

version: "3.3"
services:
  redis-stack:
    image: redis/redis-stack:7.0.6-RC4
    restart: unless-stopped
    ports:
      - 10001:6379
      - 13333:8001
    volumes:
      - ./data/redis-stack/:/data

  db:
    image: postgres
    restart: unless-stopped
    ports:
      - 5432:5432
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: 1234

  pgadmin:
    image: dpage/pgadmin4
    restart: unless-stopped
    ports:
      - 5555:80
    environment:
      PGADMIN_DEFAULT_PASSWORD: 1234
      PGADMIN_DEFAULT_EMAIL: arkan.m.gerges@gmail.com

  debezium:
    image: debezium/server:2.1.2.Final
    restart: unless-stopped
    ports:
      - 8180:8080
    volumes:
      - ./config/debezium:/debezium/conf
      - ./data/debezium:/debezium/data
    depends_on:
      - redis-stack
      - db

networks:
    app-network:

in the config/debezium/application.properties

debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.offset.storage.redis.address=redis-stack:6379
debezium.source.schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory
debezium.source.schema.history.internal.redis.address=redis-stack:6379
debezium.sink.type=redis
debezium.sink.redis.address=redis-stack:6379
debezium.source.database.hostname=db
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=1234
debezium.source.database.dbname=softwaredev_expert
debezium.source.database.server.name=db
debezium.source.schema.whitelist=public
debezium.source.schema.include.list=public
debezium.source.plugin.name=pgoutput

I can access redis insights, and access to postgresql, but I'm getting errors running debezium:

https://pastebin.com/9TGUNvKe


Solution

  • I've solved the issue with the following application.properties Here I've used the outbox pattern and for each aggregate type, a new stream will be created, but all the events for that aggregate will be in the same stream.

    # Debezium redis sink connector
    debezium.sink.type=redis
    debezium.sink.redis.address=redis-stack:6379
    debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
    debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
    debezium.source.offset.flush.interval.ms=0
    debezium.source.offset.storage.redis.address=redis-stack:6379
    debezium.source.schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory
    debezium.source.schema.history.internal.redis.address=redis-stack:6379
    
    # Source database connection info
    debezium.source.database.hostname=db
    debezium.source.database.port=5432
    debezium.source.database.user=postgres
    debezium.source.database.password=1234
    debezium.source.database.dbname=softwaredev_expert
    debezium.source.table.include.list=public.outbox_events
    debezium.source.plugin.name=pgoutput
    debezium.source.topic.prefix=swdevexpert
    
    # Outbox event router
    debezium.transforms=outbox
    debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
    debezium.transforms.outbox.route.topic.replacement=swdevexpert.events.$1
    

    I filtered only the database table outbox_events

    And my docker-compose is:

    version: "3.3"
    services:
      redis-stack:
        image: redis/redis-stack:7.0.6-RC4
        restart: unless-stopped
        ports:
          - "10001:6379"
          - "13333:8001"
        volumes:
          - ./data/redis-stack/:/data
    
      db:
        image: postgres
        restart: unless-stopped
        ports:
          - "5432:5432"
        environment:
          POSTGRES_USER: postgres
          POSTGRES_PASSWORD: 1234
        command:
          - "-c"
          - "config_file=/etc/postgresql/postgresql.conf"
        volumes:
          - ./data/postgresql:/var/lib/postgresql/data
          - ./config/postgresql/postgresql.conf:/etc/postgresql/postgresql.conf
    
      pgadmin:
        image: dpage/pgadmin4
        restart: unless-stopped
        ports:
          - "5555:80"
        environment:
          PGADMIN_DEFAULT_PASSWORD: 1234
          PGADMIN_DEFAULT_EMAIL: arkan.m.gerges@gmail.com
    
      debezium:
        image: debezium/server:2.1.2.Final
        restart: unless-stopped
        volumes:
          - ./config/debezium:/debezium/conf
          - ./data/debezium:/debezium/data
        depends_on:
          - redis-stack
          - db
    
    networks:
        app-network:
    

    And in config/postgresql/postgresql.conf

    listen_addresses = '*'
    port = 5432
    max_connections = 20
    shared_buffers = 128MB
    temp_buffers = 8MB
    work_mem = 4MB
    wal_level = logical
    max_wal_senders = 3
    max_replication_slots = 100
    

    Some of the sql inserts, I did not put it in one line to experiment with different timestamp:

        insert into outbox_events (id, aggregatetype, aggregateid, type, payload)
          values (uuid_generate_v4(), 'order', '111', 'my_type', '{"order_id": "111", "order_type": "car"}');
          
        insert into outbox_events (id, aggregatetype, aggregateid, type, payload)
          values (uuid_generate_v4(), 'order2', '222', 'my_type', '{"order_id": "222", "order_type": "house"}');
          
        insert into outbox_events (id, aggregatetype, aggregateid, type, payload)
          values (uuid_generate_v4(), 'order1234567890', '333', 'my_type', '{"order_id": "333", "order_type": "computer"}');
          
        insert into outbox_events (id, aggregatetype, aggregateid, type, payload)
          values (uuid_generate_v4(), 'order1234567890', '444', 'my_type', '{"order_id": "444", "order_type": "computer"}');
          
        insert into outbox_events (id, aggregatetype, aggregateid, type, payload)
          values (uuid_generate_v4(), 'order1234567890_one_two', '555', 'my_type', '{"order_id": "555", "order_type": "computer"}');
    

    And on redis insight:

    enter image description here