oracle-databasepostgresqltriggersjythonoracle-data-integrator

Oracle ODI JKM for PostgreSQL: Create Trigger


I need to implement Changed Data Capture (CDC) from a PostgreSQL database to an Oracle Database.

Since there is no Journalization Knowledge Module for CDC for PostgreSQL, I am trying to adapt JKM Oracle Simple, as specified at https://forums.oracle.com/forums/thread.jspa?threadID=620355.

I am however having trouble with the Jython "Create trigger" command.

In ODI, I have replaced the "Create trigger" command with the following:

drop trigger if exists public_t_payment on public.payment;

drop sequence if exists idSequence;

CREATE SEQUENCE idSequence;

CREATE OR REPLACE FUNCTION public_t_payment_trigger() RETURNS TRIGGER AS $$
declare
    V_FLAG  VARCHAR(1);
    V_id    integer NOT NULL DEFAULT nextval('idSequence');
begin
    if inserting then
        V_id := NEW.id;
        V_FLAG := 'I';
    end if;

    if updating then
        V_id := NEW.id;
        V_FLAG := 'I';
    end if;

    if deleting then
        V_id := OLD.id;
        V_FLAG := 'D';
    end if;

    insert into public.j$payment
    (
        JRN_SUBSCRIBER,
        JRN_CONSUMED,
        JRN_FLAG,
        JRN_DATE,
        id
    )
    select  JRN_SUBSCRIBER,
        '0',
        V_FLAG,
        sysdate,
        V_id
    from    public."SNP_SUBSCRIBERS"
    where   JRN_TNAME = 'public.payment';
    /* The following line can be uncommented for symetric replication */
    /* and  upper(USER) <> upper(''postgres'') */
end; $$ LANGUAGE plpgsql;

create trigger public_t_payment
after insert or update or delete on public.payment
for each row
execute procedure public_t_payment_trigger();

The above code works well when copied and executed on PostgreSQL, but ODI is giving me the following error when I do "Start Journal" on the source table:

ODI-1217: Session payment (712013) fails with return code 7000.
ODI-1226: Step payment fails after 1 attempt(s).
ODI-1231: An error occurred while performing a Journal operation on datastore payment.
Caused By: org.apache.bsf.BSFException: exception from Jython:
SyntaxError: ("no viable alternative at character '$'", ('<string>', 6, 19, 'returns trigger as $test\n'))

The problem seems to be with the return "as" name for the trigger ($$), but I can't figure out how to solve this problem in Jython.


Solution

  • I managed to solve this as follows.

    The technology for the "Create Trigger" command was set to Jython, but my code was purely PostgreSQL.

    Changing the Technology dropdown from "Jython" to "PostgreSQL" allowed the Create Trigger command to execute without giving any errors.

    However, since I wanted to keep the command as similar to the original as possible, I updated the above code to include the necessary Jython syntax.

    Note that, although the SQL posted in the above question executes in PostgreSQL, it is not entirely correct as it was still giving errors when the trigger was being fired.

    I have pasted below the complete Jython code for the Create Trigger command which executes on a PostgreSQL source database, maybe somebody will find at useful since there is no PostgreSQL to Oracle JKM..

    triggerCmd = """
    drop trigger if exists "<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>" on <%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>;
    
    drop sequence if exists "seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id";
    
    create sequence "seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id";
    
    create or replace function "fn_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"()
    returns trigger as $$
    declare
        V_FLAG  VARCHAR(1);
        V_id integer NOT NULL DEFAULT nextval('"seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id"');
    begin
        if (TG_OP = 'INSERT') then
            <%=odiRef.getColList("", "V_[COL_NAME] := new.[COL_NAME];", "\n\t\t", "", "PK")%>
            V_FLAG := 'I';
        end if;
    
        if (TG_OP = 'UPDATE') then
            <%=odiRef.getColList("", "V_[COL_NAME] := new.[COL_NAME];", "\n\t\t", "", "PK")%>
            V_FLAG := 'I';
        end if;
    
        if (TG_OP = 'DELETE') then
            <%=odiRef.getColList("", "V_[COL_NAME] := old.[COL_NAME];", "\n\t\t", "", "PK")%>
            V_FLAG := 'D';
        end if;
    
        insert into  <%=odiRef.getJrnInfo("JRN_FULL_NAME")%>
        (
            JRN_SUBSCRIBER,
            JRN_CONSUMED,
            JRN_FLAG,
            JRN_DATE,
            <%=odiRef.getColList("", "[COL_NAME]", ",\n\t\t", "", "PK")%>
        )
        select  JRN_SUBSCRIBER,
            '0',
            V_FLAG,
            now(),
            <%=odiRef.getColList("", "V_[COL_NAME]", ",\n\t\t", "", "PK")%>
        from    <%=odiRef.getJrnInfo("SNP_JRN_SUBSCRIBER")%>
        where   JRN_TNAME = '<%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>';
        /* The following line can be uncommented for symetric replication */
        /* and  upper(USER) <> upper('<%=odiRef.getInfo("DEST_USER_NAME")%>') */
    return new;
    end $$ language plpgsql;
    
    create trigger "<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"
    after insert or update or delete on <%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>
    for each row    
    execute procedure "fn_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"();
    """
    
    # Create the statement
    myStmt = myCon.createStatement()
    
    # Execute the trigger creation
    myStmt.execute(triggerCmd)
    
    myStmt.close()
    myStmt = None
    
    # Commit, just in case
    # myCon.commit()