As you know, Streams can be used in several cases, including some really advanced ones where you may want to transform DDL or DML changes into something very different. This post starts from the previous Streams One Way Table Replication 101 post. Instead of maintening a copy of the table in a destination schema, we'll transform DML captured changes and we'll keep the data from before the changes; we'll use a Custom Rule-Based Transformation for that purpose.
I won't argue if this is or not the right way to manage your problem; it's probably not, by the way! The goal here is to illustrate a specific point of the documentation that is described in Managing Custom Rule-Based Transformations.
First, let's represent what we want to achieve; Basically that's what follows:Custom Transformation Example
We want to populate a table named T3_HISTORY from the changes made to the T3 table, so that:
- If one row is inserted in T3, nothing is applied to T3_HISTORY
- If one or several columns are modified in a row of T3, a row is inserted in T3_HISTORY with the primary key of the changed row and the values of the columns before they've been modified
- If one row is deleted from T3, it is inserted in T3_HISTORY
- Build The Sample Schema and Table
- Configure Database Logging and the Multi-Version Data Dictionary
- Create the Streams Administrator and The Streams Queue
- Create the Capture Process and Define the Capture Rules
- Create a Declarative Rule-Based Transformation and Add it to the Capture Process
- Create an Apply Process
- Create a Custom Rule-Based Transformation and Add it to the Apply Process Rule
- Instantiate the Table in the DESTINATION Schema
- Start the Capture and Apply Processes
- Test the Table Replication
- Stop and Suppress the Oracle Streams Configuration
Note:
We've tested this post on a 11.1.0.7 Enterprise Edition version of Oracle running on Linux 32bits.
Step 1: Build The Sample Schema and Table
In order to start, we'll create a schema named SOURCE
and a table named T3
. The script below can be used for that simple operation:
connect / as sysdbaStep 2: Configure Database Logging and the Multi-Version Data Dictionary
create user source
identified by source
default tablespace users
temporary tablespace temp;
grant connect,resource to source;
connect source/source
create table t3(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));
insert into t3(id, text1, text2)
values (1,'Text 1','Text 1');
insert into t3(id, text1, text2)
values (2,'Text 2','Text 2');
commit;
Then, we'll enable the supplemental logging and we'll capture dictionary for our apply process; for a more detailed explanation, refer to the corresponding section of the previous post:
connect / as sysdba
alter database add supplemental log
data (primary key, unique index) columns;
select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;
SUPPLEME SUP SUP
-------- --- ---
IMPLICIT YES YES
connect / as sysdba
var first_scn number;
set serveroutput on
DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/
First SCN Value = 49042939541
exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'source.t3');
Step 3: Create the Streams Administrator and The Streams Queue
We'll create the Streams Administrator and the Streams Queue; for a more detailed explanation, refer to the corresponding section of the previous post:
connect / as sysdba
CREATE TABLESPACE streams_tbs
DATAFILE '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
SIZE 25M AUTOEXTEND ON MAXSIZE 256M;
CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;
grant dba to strmadmin;
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/
Step 4: Create the Capture Process and Define the Capture Rules
Once the source and the destination databases setup, you can create a capture process and add it a rule to capture the source table; In this case we are not interested by the
INSERT
commands but instead, just by DELETE
and UPDATE
. For that reason we'll use a AND conditition that will be added to the rule evaluation. For a more detailed explanation about the capture process, you can refer to the corresponding section of the previous post: connect strmadmin/strmadmin
var first_scn number;
exec :first_scn:= 49042939541
BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'capture4transfo',
rule_set_name => NULL,
source_database => 'BLACK',
use_database_link => false,
first_scn => :first_scn,
logfile_assignment => 'implicit');
END;
/
col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11
select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;
CAPTURE_NAME QUEUE_NAME FIRST_SCN START_SCN RULE_SET_NA
--------------- ------------- ------------- ------------- -----------
CAPTURE4TRANSFO STREAMS_QUEUE 49042939541 49042939541
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t3',
streams_type => 'capture',
streams_name => 'capture4transfo',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true,
and_condition =>
':lcr.get_command_type() in (''DELETE'',''UPDATE'')');
END;
/
set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15
select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;
STREAMS_NAME STREAMS_T TABLE_OWNE TABLE_NAME RULE_TYP RULE_NAME
---------------- --------- ---------- ---------- -------- ---------
CAPTURE4TRANSFO CAPTURE SOURCE T3 DML T352
col rule_condition format a60 wor wra
select rule_condition
from dba_streams_table_rules
where streams_name='CAPTURE4TRANSFO'
and table_owner='SOURCE'
and table_name='T3';
RULE_CONDITION
------------------------------------------------------------
(((((:dml.get_object_owner() = 'SOURCE' and :dml.get_object_
name() = 'T3')) and :dml.get_source_database_name() = 'BLACK
' )) and (:dml.get_command_type() in ('DELETE','UPDATE')))
Step 5: Create a Declarative Rule-Based Transformation and Add it to the Capture Process
We want the transformation to be as resuable as possible. For that reason we'll proceed in 2 step. The first step consists in renaming the table from SOURCE.T3
to DESTINATION.T3_HISTORY
with de declarative rule based transformation. We'll apply the transformation to the capture rule and the script below details the associated steps: connect strmadmin/strmadminStep 6: Create an Apply Process
begin
dbms_streams_adm.rename_table(
rule_name => 'T352' ,
from_table_name => 'SOURCE.T3',
to_table_name => 'DESTINATION.T3_HISTORY',
step_number => 0,
operation => 'add');
end;
/
col rule_name format A6
col from_schema_name format a6
col to_schema_name format a12
select rule_name,
transform_type,
from_schema_name,
to_schema_name,
declarative_type
from dba_streams_transformations;
RULE_N TRANSFORM_TYPE FROM_S TO_SCHEMA_NA DECLARATIVE_T
------ -------------------------- ------ ------------ -------------
T352 DECLARATIVE TRANSFORMATION SOURCE DESTINATION RENAME TABLE
We can create an apply process that will subscribe the Streams queue and get the changes to be applied on the
DESTINATION.T3_HISTORY
table:connect strmadmin/strmadmin
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'destination.t3_history',
streams_type => 'apply',
streams_name => 'apply4transfo',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true);
END;
/
col apply_name format a13
col queue_name format a13
col rule_set_name format a11
select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;
APPLY_NAME QUEUE_NAME RULE_SET_NA STATUS MESSAGE_DE
------------- ------------- ----------- -------- ----------
APPLY4TRANSFO STREAMS_QUEUE RULESET$_55 DISABLED CAPTURED
set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a11
col table_name format a15
col rule_type format a8
col rule_name format a15
select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where STREAMS_NAME='APPLY4TRANSFO';
STREAMS_NAME STREAMS_T TABLE_OWNER TABLE_NAME RULE_TYP RULE_NAME
---------------- --------- ----------- ---------- -------- ------------
APPLY4TRANSFO APPLY DESTINATION T3_HISTORY DML T3_HISTORY54
Step 7: Create a Custom Rule-Based Transformation and Add it to the Apply Process Rule
As you can easily understand the transformation we'll apply to the LCR is not simple enough to be executed declaratively. For that reason, we'll create a custom function that will take the LCR as a parameter and will return the transformed LCR. You'll find the code of it below; it consists in the following:
- Make sure the LCR is an 'UPDATE' or a 'DELETE'
- Get all the before images (i.e. 'old') of the changed and PK/UK columns and set them as the values to be inserted in the LCR (i.e. 'new')
- Add a column SCN that contains the SCN of the DML statement
- Add a column DML_OPERATION that says if the capture operation was an UPDATE or a DELETE
- Delete all the before images (i.e. 'old') of the LCR
- Transform the command into an INSERT
connect strmadmin/strmadmin
create or replace function transform4history(in_any in anydata)
return anydata
IS
lcr sys.lcr$_row_record;
lcr_rl sys.lcr$_row_list;
rc number;
invalid_column exception;
pragma exception_init(invalid_column,-23607);
begin
-- Get the type of object
-- Check if the object type is SYS.LCR$_ROW_RECORD
if in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
-- Put the row LCR into lcr
rc := in_any.GETOBJECT(lcr);
-- Check if that's an Update or a Delete
if (lcr.GET_COMMAND_TYPE() in ('UPDATE','DELETE')) then
lcr_rl:=lcr.get_values('old');
for i in lcr_rl.first..lcr_rl.last loop
begin
lcr.set_value('new',lcr_rl(i).column_name,
lcr.get_value('old',lcr_rl(i).column_name));
exception when invalid_column then
lcr.add_column('new',lcr_rl(i).column_name,
lcr.get_value('old',lcr_rl(i).column_name));
end;
end loop;
lcr.add_column('new','SCN',
anydata.convertnumber(lcr.get_scn()));
lcr.add_column('new','DML_OPERATION',
anydata.convertvarchar2(lcr.get_command_type()));
for i in lcr_rl.first..lcr_rl.last loop
lcr.delete_column(lcr_rl(i).column_name,'old');
end loop;
lcr.set_command_type('INSERT'); -- Change it to an insert
return anydata.convertobject(lcr);
end if;
return in_any;
end if;
end;
/
You can add the transformation to the apply rule that matches the destination table. It order to do it, use
DBMS_STREAMS_ADM.SET_RULE_TRANSFORM_FUNCTION
like below:connect strmadmin/strmadminStep 8: Instantiate the Table in the
begin
dbms_streams_adm.set_rule_transform_function(
rule_name => 'T3_HISTORY54',
transform_function => 'transform4history');
end;
/
col rule_name format A12
col user_function_name format a32
select rule_name,
transform_type,
user_function_name
from dba_streams_transformations;
RULE_NAME TRANSFORM_TYPE USER_FUNCTION_NAME
------------ -------------------------- --------------------------------
T352 DECLARATIVE TRANSFORMATION
T3_HISTORY54 CUSTOM TRANSFORMATION "STRMADMIN"."TRANSFORM4HISTORY"
DESTINATION
SchemaStreams is now configured. Before you start the different processes, you must instantiate the table in the
DESTINATION
schema and store the instantiation SCN so that the apply process knows what changes exactly it can apply. In our test case, we'll start with an empty table that will be slighly different from the original table:- We'll add a column SCN to store the SCN of the change
- We'll add a column DML_OPERATION to store the DML command used on the primary table
- It won't have any Primary Key; even though we could create one on (ID, SCN)
connect / as sysdba
create user destination
identified by destination
default tablespace users
temporary tablespace temp;
grant connect,resource to destination;
create table destination.t3_history(
id number,
text1 varchar2(80),
text2 varchar2(80),
scn number,
dml_operation varchar2(6));
col apply_scn format 999999999999Once the table created, use the
select dbms_flashback.get_system_change_number apply_scn
from dual;
APPLY_SCN
-----------
49042945957
dbms_apply_adm.
set_table_instantiation_scn
procedure to store the instantiation SCN for the apply process like below:beginStep 9: Start the Capture and Apply Processes
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t3',
source_database_name => 'BLACK',
instantiation_scn => 49042945957);
end;
/
col SOURCE_DATABASE format a6
col OBJECT format a10
col INSTANTIATION_SCN format 999999999999
select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;
SOURCE OBJECT INSTANTIATION_SCN
------ ---------- -----------------
BLACK SOURCE.T3 49042945957
You can start the capture and the apply processes :
exec dbms_capture_adm.start_capture('capture4transfo');Step 10: Test the Table Replication
exec dbms_apply_adm.start_apply('apply4transfo');
To test the replication, update the source table and check the changes are propagated to the destination table. Below is a set of simple tests that shows the replication is working as it should:
update source.t3 set text1= 'Text 3'Step 11: Stop and Suppress the Oracle Streams Configuration
where id=1;
delete from source.t3
where id=1;
commit;
pause
col id format 99
col text1 format a6
col text2 format a6
col scn format 999999999999
select id,
text1,
text2,
scn,
dml_operation
from destination.t3_history
order by id, scn;
ID TEXT1 TEXT2 SCN DML_OP
--- ------ ------ ------------- ------
1 Text 1 49042951081 UPDATE
1 Text 3 Text 1 49042965443 DELETE
In this last step, we'll clean up the environment and leave it as it was when we've started. In order to do it it, we'll stop and drop the capture and apply processes; we'll drop the Streams queue, the Streams administrator and the associated tablespace; we'll drop the 2 schemes and we'll disabled the supplemental logging:
connect / as sysdba
exec dbms_capture_adm.stop_capture('capture4transfo');
exec dbms_apply_adm.stop_apply('apply4transfo');
exec dbms_capture_adm.drop_capture('capture4transfo',true);
exec dbms_apply_adm.drop_apply('apply4transfo',true);
exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);
exec dbms_apply_adm.SET_TABLE_INSTANTIATION_SCN('source.t3','BLACK', null)
select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;
drop user strmadmin cascade;
drop tablespace streams_tbs
including contents and datafiles;
drop user destination cascade;
drop user source cascade;
alter database drop supplemental log
data (primary key, unique index) columns;
select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;
SUPPLEME SUP SUP
-------- --- ---
NO NO NO
Read more...