javaoracle-databaseapache-flinkdebeziumcdc

Debezium + Flink Oracle CDC - "db history topic or its content is fully or partially missing" for some tables


I am using Flink with Debezium to consume CDC changes from Oracle DB tables via LogMiner.

For some tables, everything works fine. For example, the following table works without issues:

CREATE TABLE CDC_PRODUCTS (
    ID NUMBER,
    NAME VARCHAR2(100),
    DESCRIPTION VARCHAR2(200),
    WEIGHT NUMBER(10, 3)
);

However, for other tables, I get the following error:

io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.

Full stack trace:

io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)
    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275)
    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118)
    at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84)
    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261)
    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153)
    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

20:13:49.779 [Source Data Fetcher for Source: OracleParallelSource (1/1)#92] ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
    ... 7 more

The problematic table has:

Questions:

  1. Could this be caused by missing snapshots or metadata for that specific table?

  2. Does Debezium LogMiner behave differently when constraints like PRIMARY KEY or NOT NULL are present?

  3. Any suggestions for recovering or reinitializing the DB history for such cases?

Any guidance would be appreciated.

Edit: this is the DDL for the problematic table:

CREATE TABLE "SI_HCMP_REPLICA1"."*****_HISTORY" 
   (    "****_ID" NUMBER DEFAULT TO_NUMBER(TO_CHAR(SYSDATE@!,'YYYYMMDDHH24MI')||TO_CHAR(****.****.NEXTVAL)), 
    "CUSTOMER_ID" NUMBER NOT NULL ENABLE, 
    "ASSET_ID" NUMBER NOT NULL ENABLE, 
    "ASSET_LAST_MODIFIED_ON" DATE NOT NULL ENABLE, 
    "CREATED_ON" DATE DEFAULT SYSDATE NOT NULL ENABLE, 
    "CREATED_BY_ID" VARCHAR2(100), 
    "UPDATED_ON" DATE, 
    "UPDATED_BY_ID" VARCHAR2(100), 
    "PROCESS_STATUS" VARCHAR2(1) DEFAULT 'N' NOT NULL ENABLE, 
    "ASSET_TECH_ID" NUMBER, 
    "WORKFLOW_FLAG" VARCHAR2(1) DEFAULT 'N' NOT NULL ENABLE, 
     CONSTRAINT "RM_ASSET_HISTORY_PK" PRIMARY KEY ("****_ID")
  USING INDEX PCTFREE 10 INITRANS 2 MAXTRANS 255 COMPUTE STATISTICS 
  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645
  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "HCMP_DATA"  ENABLE, 
     SUPPLEMENTAL LOG DATA (ALL) COLUMNS, 
     SUPPLEMENTAL LOG GROUP "*****_HISTORY" ("ASSET_ID", "CUSTOMER_ID", "CREATED_ON") ALWAYS
   ) PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 
  STORAGE(
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "HCMP_DATA" 
  PARTITION BY RANGE ("CREATED_ON") INTERVAL (NUMTODSINTERVAL (1,'DAY')) 
  SUBPARTITION BY LIST ("PROCESS_STATUS") 
  SUBPARTITION TEMPLATE ( 
    SUBPARTITION "PROCESSED" VALUES ( 'Y' ), 
    SUBPARTITION "UNPROCESSED" VALUES ( 'N' ) ) 
 (PARTITION "SYS_P164554"  VALUES LESS THAN (TO_DATE(' 2023-12-22 00:00:00', 'SYYYY-MM-DD HH24:MI:SS', 'NLS_CALENDAR=GREGORIAN')) 
PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 
  STORAGE(
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "HCMP_DATA" 
 ( SUBPARTITION "SYS_SUBP164553"  VALUES ('N') SEGMENT CREATION IMMEDIATE 
  STORAGE(
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "HCMP_DATA" 
 NOCOMPRESS , 
  SUBPARTITION "PROCESSED_ASSET_SYS_P164554"  VALUES ('Y') SEGMENT CREATION DEFERRED 
  STORAGE(
  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)
  TABLESPACE "****_DATA" 
 NOCOMPRESS ) )  ENABLE ROW MOVEMENT 

Upon further researching this might have been due to partitions on the table.


Solution

  • You may be able to work around this error by creating a custom view of ALL_TABLES for the relevant user. That view can return a non-null value for TABLESPACE when the table is partitioned, which might avoid fink-cdc issue 1737.

    CREATE OR REPLACE VIEW your_user.all_tables AS
    SELECT owner,table_name,
        tablespace_name,
        CASE WHEN partitioned = 'YES' THEN 'flink-cdc 1737 fake tbspcname' ELSE tablespace_name END tablespace_name,
        cluster_name,iot_name,status,pct_free,pct_used,ini_trans,max_trans,initial_extent,next_extent,min_extents,max_extents,pct_increase,freelists,freelist_groups,logging,backed_up,num_rows,blocks,empty_blocks,avg_space,chain_cnt,avg_row_len,avg_space_freelist_blocks,num_freelist_blocks,degree,instances,cache,table_lock,sample_size,last_analyzed,partitioned,iot_type,temporary,secondary,nested,buffer_pool,flash_cache,cell_flash_cache,row_movement,global_stats,user_stats,duration,skip_corrupt,monitoring,cluster_owner,dependencies,compression,compress_for,dropped,read_only,segment_created,result_cache,clustering,activity_tracking,dml_timestamp,has_identity,container_data,inmemory,inmemory_priority,inmemory_distribute,inmemory_compression,inmemory_duplicate,default_collation,duplicated,sharded,external,hybrid,cellmemory,containers_default,container_map,extended_data_link,extended_data_link_map,inmemory_service,inmemory_service_name,container_map_object,memoptimize_read,memoptimize_write,has_sensitive_column,admit_null,data_link_dml_enabled,logical_replication
    FROM sys.all_tables;
    

    Be careful

    Obviously, messing with the data dictionary like this is dangerous and should be documented and tested thoroughly. Only consider this solution as a last resort, if you're not able to patch the application. Returning the wrong value for TABLESPACE_NAME for that user could cause problems with another application.

    Or maybe returning the wrong TABLESPACE_NAME causes additional issues with fink-cdc. The PR makes it sound like only that one query needs to be bypassed, but why would the query return the TABLESPACE_NAME if it's not using it? If the application makes an assumption that a table only has one tablespace, you could make your tables match that assumption. And then you could modify the ALL_TABLES view to return the first tablespace found for the relevant partitions or subpartitions.

    CREATE OR REPLACE VIEW your_user.all_tables AS
    SELECT owner,table_name,
        CASE
            WHEN partitioned = 'YES' THEN
                (
                    COALESCE
                    (
                        (
                            SELECT MAX(tablespace_name)
                            FROM all_tab_partitions
                            WHERE all_tab_partitions.table_owner = all_tables.owner
                                AND all_tab_partitions.table_name = all_tables.table_name
                        ),
                        (
                            SELECT MAX(tablespace_name)
                            FROM all_tab_subpartitions
                            WHERE all_tab_subpartitions.table_owner = all_tables.owner
                                AND all_tab_subpartitions.table_name = all_tables.table_name
                        ),
                        all_tables.tablespace_name
                    )
                )
            ELSE
                tablespace_name
        END tablespace_name,
     cluster_name,iot_name,status,pct_free,pct_used,ini_trans,max_trans,initial_extent,next_extent,min_extents,max_extents,pct_increase,freelists,freelist_groups,logging,backed_up,num_rows,blocks,empty_blocks,avg_space,chain_cnt,avg_row_len,avg_space_freelist_blocks,num_freelist_blocks,degree,instances,cache,table_lock,sample_size,last_analyzed,partitioned,iot_type,temporary,secondary,nested,buffer_pool,flash_cache,cell_flash_cache,row_movement,global_stats,user_stats,duration,skip_corrupt,monitoring,cluster_owner,dependencies,compression,compress_for,dropped,read_only,segment_created,result_cache,clustering,activity_tracking,dml_timestamp,has_identity,container_data,inmemory,inmemory_priority,inmemory_distribute,inmemory_compression,inmemory_duplicate,default_collation,duplicated,sharded,external,hybrid,cellmemory,containers_default,container_map,extended_data_link,extended_data_link_map,inmemory_service,inmemory_service_name,container_map_object,memoptimize_read,memoptimize_write,has_sensitive_column,admit_null,data_link_dml_enabled,logical_replication
    FROM sys.all_tables;
    

    And the above column list is only accurate for one version of Oracle. You'll want to generate your own list with a query like the one below. And don't forget to update the view if you upgrade the database.

    -- Generate the list of columns for your version of Oracle:
    SELECT LISTAGG(column_name, ',') WITHIN GROUP (ORDER BY column_id) column_list
    FROM dba_tab_columns
    WHERE table_name = 'ALL_TABLES' AND owner = 'SYS';
    

    Despite all the pitfalls, there are times when it's necessary to override the data dictionary like this. I've done this before in production for an unsupported application.