Bookmark and Share

Sunday, September 6, 2009

The New Streams 11.2 SQL Generation Facility

I must admit, I'm kind of disappointed by the Oracle 11.2 Streams new features. Not that there is nothing important: the support for SecureFile LOBs and compressed tables are really things, I've been eagerly waiting for. This new release is huge and looks promising! However, I'm disappointed for the following 3 reasons:
  • I did not manage to use statement handlers yet. Though I find them very easy to use for some specific use cases. But by hand or with dbms_streams_adm.maintain_change_table, they've always been failing with ORA-01008: not all variables bound
  • I was kind of hoping for the following restriction to be handled: "The DBMS_STREAMS_ADVISOR_ADM package does not gather information about synchronous captures or messaging clients.".
  • Last but not least, there are obvious hidden and undocumented features which raise my level of frustration to its maximum.
Anyway, I imagine I have no other choices than to be patient and wish those features can be fixed and documented, hopefully, as soon as possible. In the meantime, I propose we explore one of those new features called SQL Generation.

SQL Generation is the ability you get in a Custom DML Handler or any piece of code that deal with LCR to transform it into its canonical SQL command. What we'll do is write a very simple example that will use a DML handler to store that SQL command in a table instead of applying it to the destination schema.

Step 1: Create a sample Schema

To begin with the sample, create a source schema with a table and a few rows:
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t5(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t5(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t5(id, text1, text2)
values (2,'Text 2','Text 2');

commit;

Step 2: Create the Streams administrator and a queue

Once done, you can create a Streams Administrator and a queue to use in your configuration:
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table =>'strmadmin.streams_queue_table',
queue_name =>'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

USERNAME LOC ACC
--------- --- ---
STRMADMIN YES YES

Step 3: Create a Capture Process

Then create a capture process on the source schema:
connect / as sysdba

var first_scn number;
set serveroutput on

DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/

col first_scn new_value first_scn
select :first_scn first_scn
from dual;

connect strmadmin/strmadmin

prompt &&first_scn
prompt &&dbname

BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'SQLGEN_CAPTURE',
rule_set_name => NULL,
source_database => '&&dbname',
use_database_link => false,
first_scn => &&first_scn,
logfile_assignment => 'implicit');
END;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'capture',
streams_name => 'SQLGEN_CAPTURE',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/

set lines 120
col streams_name format a16
col streams_type format a9
col table format a15
col rule_type format a8
col rule_name format a15
col rule_condition format a60 wor wra

select streams_name,
streams_type,
table_owner||'.'||table_name "TABLE",
rule_type,
rule_name,
rule_condition
from dba_streams_table_rules
where streams_name='SQLGEN_CAPTURE'
and table_owner='SOURCE'
and table_name='T5';

Step 4: Create a procedure and a table to store the SQL of the LCR

Don't apply the LCR on the destination; instead, create a table and a procedure that will use SQL Generation to store the change vector as a DML command in that table:
connect strmadmin/strmadmin

create table mydml_tab(
id number,
sqltext clob);

create sequence mydml_seq;

create or replace procedure mydml_handler(in_any in anydata)
is
lcr sys.lcr$_row_record;
v_sqltext clob:=' /* COMMENT */';
rc number;
begin
rc := in_any.GETOBJECT(lcr);
lcr.get_row_text(v_sqltext);
insert into mydml_tab
values (mydml_seq.nextval,v_sqltext);
end;
/

Step 5: Create an Apply process with a DML handler

Create an apply process and add the DML handler to it:
connect strmadmin/strmadmin

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'apply',
streams_name => 'SQLGEN_APPLY',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/
col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status
from dba_apply;

set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a11
col table_name format a15
col rule_type format a8
col rule_name format a15

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where STREAMS_NAME='SQLGEN_APPLY';

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t5',
source_database_name => '&&dbname',
instantiation_scn => &&first_scn);
end;
/

col "OBJ3CT" format a15
col operation_name format a8
col user_procedure format a28
col apply_name format a13

select object_owner||'.'||object_name "OBJ3CT",
operation_name,
user_procedure,
apply_name
from dba_apply_dml_handlers;

col source_database format a10
col "OBJECT" format a15
set numwidth 15
select source_database,
SOURCE_OBJECT_OWNER||'.'||SOURCE_OBJECT_NAME "OBJECT",
instantiation_scn
from dba_apply_instantiated_objects;

Step 6: Start the Apply and Capture processes

Now start the apply and the capture processes and wait for them to keep up with the current position of the log writer:
exec dbms_apply_adm.start_apply('SQLGEN_APPLY')
exec dbms_capture_adm.start_capture('SQLGEN_CAPTURE')

Step 7: Test the SQL Generation

You can test your settings by inserting or updating a row in the source table; After some time, query the table that stores the generated SQL:
insert into source.t5 values (3,'Hello','Hello');
commit;

-- Wait a few minutes
col id format 99
col sqltext format a50
set long 1000
set longchunksize 1000
select * from mydml_tab;
The table content should look like below:
 ID SQLTEXT
--- --------------------------------------------------
1 /* COMMENT */ INSERT INTO "SOURCE"."T5"("ID","TEX
T1","TEXT2" ) VALUES ( 3,'Hello','Hello')
You can perform more tests with UPDATE or DELETE

Step 8: Drop the test environment

Like always with my example, I propose you drop the whole configuration before you leave:
connect / as sysdba

exec dbms_apply_adm.stop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.stop_capture('SQLGEN_CAPTURE')

exec dbms_apply_adm.delete_all_errors('SQLGEN_APPLY');
exec dbms_apply_adm.drop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.drop_capture('SQLGEN_CAPTURE')

begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/

begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user source cascade;

1 comment:

  1. In step 5, you do not explicitly call the dbms_apply_adm.create_apply procedure. Does adding the table rules implicitly do this for you?

    ReplyDelete