I need to implement Changed Data Capture (CDC) from a PostgreSQL database to an Oracle Database.
Since there is no Journalization Knowledge Module for CDC for PostgreSQL, I am trying to adapt JKM Oracle Simple, as specified at https://forums.oracle.com/forums/thread.jspa?threadID=620355.
I am however having trouble with the Jython "Create trigger" command.
In ODI, I have replaced the "Create trigger" command with the following:
drop trigger if exists public_t_payment on public.payment;
drop sequence if exists idSequence;
CREATE SEQUENCE idSequence;
CREATE OR REPLACE FUNCTION public_t_payment_trigger() RETURNS TRIGGER AS $$
declare
V_FLAG VARCHAR(1);
V_id integer NOT NULL DEFAULT nextval('idSequence');
begin
if inserting then
V_id := NEW.id;
V_FLAG := 'I';
end if;
if updating then
V_id := NEW.id;
V_FLAG := 'I';
end if;
if deleting then
V_id := OLD.id;
V_FLAG := 'D';
end if;
insert into public.j$payment
(
JRN_SUBSCRIBER,
JRN_CONSUMED,
JRN_FLAG,
JRN_DATE,
id
)
select JRN_SUBSCRIBER,
'0',
V_FLAG,
sysdate,
V_id
from public."SNP_SUBSCRIBERS"
where JRN_TNAME = 'public.payment';
/* The following line can be uncommented for symetric replication */
/* and upper(USER) <> upper(''postgres'') */
end; $$ LANGUAGE plpgsql;
create trigger public_t_payment
after insert or update or delete on public.payment
for each row
execute procedure public_t_payment_trigger();
The above code works well when copied and executed on PostgreSQL, but ODI is giving me the following error when I do "Start Journal" on the source table:
ODI-1217: Session payment (712013) fails with return code 7000.
ODI-1226: Step payment fails after 1 attempt(s).
ODI-1231: An error occurred while performing a Journal operation on datastore payment.
Caused By: org.apache.bsf.BSFException: exception from Jython:
SyntaxError: ("no viable alternative at character '$'", ('<string>', 6, 19, 'returns trigger as $test\n'))
The problem seems to be with the return "as" name for the trigger ($$), but I can't figure out how to solve this problem in Jython.
I managed to solve this as follows.
The technology for the "Create Trigger" command was set to Jython, but my code was purely PostgreSQL.
Changing the Technology dropdown from "Jython" to "PostgreSQL" allowed the Create Trigger command to execute without giving any errors.
However, since I wanted to keep the command as similar to the original as possible, I updated the above code to include the necessary Jython syntax.
Note that, although the SQL posted in the above question executes in PostgreSQL, it is not entirely correct as it was still giving errors when the trigger was being fired.
I have pasted below the complete Jython code for the Create Trigger command which executes on a PostgreSQL source database, maybe somebody will find at useful since there is no PostgreSQL to Oracle JKM..
triggerCmd = """
drop trigger if exists "<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>" on <%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>;
drop sequence if exists "seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id";
create sequence "seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id";
create or replace function "fn_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"()
returns trigger as $$
declare
V_FLAG VARCHAR(1);
V_id integer NOT NULL DEFAULT nextval('"seq_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>_id"');
begin
if (TG_OP = 'INSERT') then
<%=odiRef.getColList("", "V_[COL_NAME] := new.[COL_NAME];", "\n\t\t", "", "PK")%>
V_FLAG := 'I';
end if;
if (TG_OP = 'UPDATE') then
<%=odiRef.getColList("", "V_[COL_NAME] := new.[COL_NAME];", "\n\t\t", "", "PK")%>
V_FLAG := 'I';
end if;
if (TG_OP = 'DELETE') then
<%=odiRef.getColList("", "V_[COL_NAME] := old.[COL_NAME];", "\n\t\t", "", "PK")%>
V_FLAG := 'D';
end if;
insert into <%=odiRef.getJrnInfo("JRN_FULL_NAME")%>
(
JRN_SUBSCRIBER,
JRN_CONSUMED,
JRN_FLAG,
JRN_DATE,
<%=odiRef.getColList("", "[COL_NAME]", ",\n\t\t", "", "PK")%>
)
select JRN_SUBSCRIBER,
'0',
V_FLAG,
now(),
<%=odiRef.getColList("", "V_[COL_NAME]", ",\n\t\t", "", "PK")%>
from <%=odiRef.getJrnInfo("SNP_JRN_SUBSCRIBER")%>
where JRN_TNAME = '<%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>';
/* The following line can be uncommented for symetric replication */
/* and upper(USER) <> upper('<%=odiRef.getInfo("DEST_USER_NAME")%>') */
return new;
end $$ language plpgsql;
create trigger "<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"
after insert or update or delete on <%=odiRef.getJrnInfo("FULL_TABLE_NAME")%>
for each row
execute procedure "fn_<%=odiRef.getJrnInfo("JRN_FULL_TRIGGER")%>"();
"""
# Create the statement
myStmt = myCon.createStatement()
# Execute the trigger creation
myStmt.execute(triggerCmd)
myStmt.close()
myStmt = None
# Commit, just in case
# myCon.commit()