I must admit, I'm kind of disappointed by the Oracle 11.2 Streams new features. Not that there is nothing important: the support for SecureFile LOBs and compressed tables are really things, I've been eagerly waiting for. This new release is huge and looks promising! However, I'm disappointed for the following 3 reasons:
Anyway, I imagine I have no other choices than to be patient and wish those features can be fixed and documented, hopefully, as soon as possible. In the meantime, I propose we explore one of those new features called SQL Generation.
SQL Generation is the ability you get in a Custom DML Handler or any piece of code that deal with LCR to transform it into its canonical SQL command. What we'll do is write a very simple example that will use a DML handler to store that SQL command in a table instead of applying it to the destination schema.Step 1: Create a sample Schema
To begin with the sample, create a source schema with a table and a few rows:
connect / as sysdba
create user source
identified by source
default tablespace users
temporary tablespace temp;
grant connect,resource to source;
col dbname new_value dbname
select value dbname
from v$parameter
where name='db_unique_name';
prompt &&dbname
connect source/source
create table t5(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));
insert into t5(id, text1, text2)
values (1,'Text 1','Text 1');
insert into t5(id, text1, text2)
values (2,'Text 2','Text 2');
commit;
Step 2: Create the Streams administrator and a queue
Once done, you can create a Streams Administrator and a queue to use in your configuration:
connect / as sysdba
create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;
CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;
grant dba to strmadmin;
begin
dbms_streams_adm.set_up_queue(
queue_table =>'strmadmin.streams_queue_table',
queue_name =>'strmadmin.streams_queue');
end;
/
exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);
select *
from dba_streams_administrator;
USERNAME LOC ACC
--------- --- ---
STRMADMIN YES YES
Step 3: Create a Capture Process
Then create a capture process on the source schema:
connect / as sysdba
var first_scn number;
set serveroutput on
DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/
col first_scn new_value first_scn
select :first_scn first_scn
from dual;
connect strmadmin/strmadmin
prompt &&first_scn
prompt &&dbname
BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'SQLGEN_CAPTURE',
rule_set_name => NULL,
source_database => '&&dbname',
use_database_link => false,
first_scn => &&first_scn,
logfile_assignment => 'implicit');
END;
/
col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11
select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'capture',
streams_name => 'SQLGEN_CAPTURE',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/
set lines 120
col streams_name format a16
col streams_type format a9
col table format a15
col rule_type format a8
col rule_name format a15
col rule_condition format a60 wor wra
select streams_name,
streams_type,
table_owner||'.'||table_name "TABLE",
rule_type,
rule_name,
rule_condition
from dba_streams_table_rules
where streams_name='SQLGEN_CAPTURE'
and table_owner='SOURCE'
and table_name='T5';
Step 4: Create a procedure and a table to store the SQL of the LCR
Don't apply the LCR on the destination; instead, create a table and a procedure that will use SQL Generation to store the change vector as a DML command in that table:
connect strmadmin/strmadmin
create table mydml_tab(
id number,
sqltext clob);
create sequence mydml_seq;
create or replace procedure mydml_handler(in_any in anydata)
is
lcr sys.lcr$_row_record;
v_sqltext clob:=' /* COMMENT */';
rc number;
begin
rc := in_any.GETOBJECT(lcr);
lcr.get_row_text(v_sqltext);
insert into mydml_tab
values (mydml_seq.nextval,v_sqltext);
end;
/
Step 5: Create an Apply process with a DML handler
Create an apply process and add the DML handler to it:
connect strmadmin/strmadmin
BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'apply',
streams_name => 'SQLGEN_APPLY',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/
col apply_name format a13
col queue_name format a13
col rule_set_name format a11
select apply_name,
queue_name,
rule_set_name,
status
from dba_apply;
set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a11
col table_name format a15
col rule_type format a8
col rule_name format a15
select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where STREAMS_NAME='SQLGEN_APPLY';
begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/
begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/
begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/
begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t5',
source_database_name => '&&dbname',
instantiation_scn => &&first_scn);
end;
/
col "OBJ3CT" format a15
col operation_name format a8
col user_procedure format a28
col apply_name format a13
select object_owner||'.'||object_name "OBJ3CT",
operation_name,
user_procedure,
apply_name
from dba_apply_dml_handlers;
col source_database format a10
col "OBJECT" format a15
set numwidth 15
select source_database,
SOURCE_OBJECT_OWNER||'.'||SOURCE_OBJECT_NAME "OBJECT",
instantiation_scn
from dba_apply_instantiated_objects;
Step 6: Start the Apply and Capture processes
Now start the apply and the capture processes and wait for them to keep up with the current position of the log writer:
exec dbms_apply_adm.start_apply('SQLGEN_APPLY')
exec dbms_capture_adm.start_capture('SQLGEN_CAPTURE')
Step 7: Test the SQL Generation
You can test your settings by inserting or updating a row in the source table; After some time, query the table that stores the generated SQL:
insert into source.t5 values (3,'Hello','Hello');
commit;
-- Wait a few minutes
col id format 99
col sqltext format a50
set long 1000
set longchunksize 1000
select * from mydml_tab;
The table content should look like below:
ID SQLTEXT
--- --------------------------------------------------
1 /* COMMENT */ INSERT INTO "SOURCE"."T5"("ID","TEX
T1","TEXT2" ) VALUES ( 3,'Hello','Hello')
You can perform more tests with UPDATE
or DELETE
Step 8: Drop the test environment
Like always with my example, I propose you drop the whole configuration before you leave:
connect / as sysdba
exec dbms_apply_adm.stop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.stop_capture('SQLGEN_CAPTURE')
exec dbms_apply_adm.delete_all_errors('SQLGEN_APPLY');
exec dbms_apply_adm.drop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.drop_capture('SQLGEN_CAPTURE')
begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/
begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/
exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);
drop user strmadmin cascade;
drop tablespace streams_tbs
including contents and datafiles;
drop user source cascade;