Bookmark and Share

Monday, September 21, 2009

Triggers and LOBs: Synchronous CDC and Synchronous Streams Capture

Just a quick post to outline some of the limits of using LOBs and triggers together; there is no secret, it's written (black on white) in the manual : depending how a LOB column is updated, triggers on the table will fire... or NOT ! The reason is indeed related to the design of the LOB Locator that allow data chunk updates without any reference to the changed row.

Anyway, I don't want to dig too much into the details of why things are like they are. Instead, you'll find below some PL/SQL code samples. Those pieces of code show that, if you work with the LOB locator, the triggers won't fire. On the other side, if you update the LOB columns directly or create a locator, in that case, the triggers will fire.

Create a table and a "for each row" trigger

To begin, we'll create a table with a trigger. That trigger increments the number of times it fires in a separate table named xlog :
create table x (id number, text clob)
lob(text) store as securefiles;

create table xlog (id number, call number);

create or replace trigger x_trigger
after insert or update or delete on x
for each row
begin
if inserting then
insert into xlog (id, call) values (:new.id, 1);
else
update xlog set call=call+1 where id=:old.id;
end if;
end;
/
Note:
To test Oracle Database 11g Release 2 Streams new features (in a latter post), I've used a securefile LOB.

Changes to LOB does fire triggers

To begin, we change a few row data without using any LOB persistent locator. As you'll discover in the xlog table every row changes fire the trigger:
insert into x values (1,null);
update x set text='X' where id=1;
update x set text='XY' where id=1;
commit;

select x.text, xlog.call
from x, xlog
where x.id=1
and xlog.id=1;

TEXT CALL
---- ----
XY 3

Changes to LOB doesn't fire triggers

On the other side, if you change the row data with the persistent LOB Locator like below, you'll find that the trigger doesn't fire:
declare
my_x clob;
begin
select text into my_x
from x where id=1
for update;
dbms_lob.append(my_x,'Z');
dbms_lob.append(my_x,'Z');
commit;
end;
/

select x.text, xlog.call
from x, xlog
where x.id=1
and xlog.id=1;

TEXT CALL
---- ----
XYZZ 3

Conclusion & Cleanup

If you just think about it, this may feed your thoughts about the LOB support (or actually non support!) with Synchronous Change Data Capture (CDC) and Synchronous Streams Capture. But enough of conclusions for today, lets just drop the tables and its trigger:
drop table x purge;
drop table xlog purge;

Read more...

Sunday, September 6, 2009

The New Streams 11.2 SQL Generation Facility

I must admit, I'm kind of disappointed by the Oracle 11.2 Streams new features. Not that there is nothing important: the support for SecureFile LOBs and compressed tables are really things, I've been eagerly waiting for. This new release is huge and looks promising! However, I'm disappointed for the following 3 reasons:
  • I did not manage to use statement handlers yet. Though I find them very easy to use for some specific use cases. But by hand or with dbms_streams_adm.maintain_change_table, they've always been failing with ORA-01008: not all variables bound
  • I was kind of hoping for the following restriction to be handled: "The DBMS_STREAMS_ADVISOR_ADM package does not gather information about synchronous captures or messaging clients.".
  • Last but not least, there are obvious hidden and undocumented features which raise my level of frustration to its maximum.
Anyway, I imagine I have no other choices than to be patient and wish those features can be fixed and documented, hopefully, as soon as possible. In the meantime, I propose we explore one of those new features called SQL Generation.

SQL Generation is the ability you get in a Custom DML Handler or any piece of code that deal with LCR to transform it into its canonical SQL command. What we'll do is write a very simple example that will use a DML handler to store that SQL command in a table instead of applying it to the destination schema.

Step 1: Create a sample Schema

To begin with the sample, create a source schema with a table and a few rows:
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t5(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t5(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t5(id, text1, text2)
values (2,'Text 2','Text 2');

commit;

Step 2: Create the Streams administrator and a queue

Once done, you can create a Streams Administrator and a queue to use in your configuration:
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table =>'strmadmin.streams_queue_table',
queue_name =>'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

USERNAME LOC ACC
--------- --- ---
STRMADMIN YES YES

Step 3: Create a Capture Process

Then create a capture process on the source schema:
connect / as sysdba

var first_scn number;
set serveroutput on

DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/

col first_scn new_value first_scn
select :first_scn first_scn
from dual;

connect strmadmin/strmadmin

prompt &&first_scn
prompt &&dbname

BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'SQLGEN_CAPTURE',
rule_set_name => NULL,
source_database => '&&dbname',
use_database_link => false,
first_scn => &&first_scn,
logfile_assignment => 'implicit');
END;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'capture',
streams_name => 'SQLGEN_CAPTURE',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/

set lines 120
col streams_name format a16
col streams_type format a9
col table format a15
col rule_type format a8
col rule_name format a15
col rule_condition format a60 wor wra

select streams_name,
streams_type,
table_owner||'.'||table_name "TABLE",
rule_type,
rule_name,
rule_condition
from dba_streams_table_rules
where streams_name='SQLGEN_CAPTURE'
and table_owner='SOURCE'
and table_name='T5';

Step 4: Create a procedure and a table to store the SQL of the LCR

Don't apply the LCR on the destination; instead, create a table and a procedure that will use SQL Generation to store the change vector as a DML command in that table:
connect strmadmin/strmadmin

create table mydml_tab(
id number,
sqltext clob);

create sequence mydml_seq;

create or replace procedure mydml_handler(in_any in anydata)
is
lcr sys.lcr$_row_record;
v_sqltext clob:=' /* COMMENT */';
rc number;
begin
rc := in_any.GETOBJECT(lcr);
lcr.get_row_text(v_sqltext);
insert into mydml_tab
values (mydml_seq.nextval,v_sqltext);
end;
/

Step 5: Create an Apply process with a DML handler

Create an apply process and add the DML handler to it:
connect strmadmin/strmadmin

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t5',
streams_type => 'apply',
streams_name => 'SQLGEN_APPLY',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => '&&dbname',
inclusion_rule => true);
END;
/
col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status
from dba_apply;

set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a11
col table_name format a15
col rule_type format a8
col rule_name format a15

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where STREAMS_NAME='SQLGEN_APPLY';

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'INSERT',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'UPDATE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T5',
object_type => 'TABLE',
operation_name => 'DELETE',
error_handler => false,
user_procedure => 'strmadmin.mydml_handler',
apply_name => 'SQLGEN_APPLY');
end;
/

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t5',
source_database_name => '&&dbname',
instantiation_scn => &&first_scn);
end;
/

col "OBJ3CT" format a15
col operation_name format a8
col user_procedure format a28
col apply_name format a13

select object_owner||'.'||object_name "OBJ3CT",
operation_name,
user_procedure,
apply_name
from dba_apply_dml_handlers;

col source_database format a10
col "OBJECT" format a15
set numwidth 15
select source_database,
SOURCE_OBJECT_OWNER||'.'||SOURCE_OBJECT_NAME "OBJECT",
instantiation_scn
from dba_apply_instantiated_objects;

Step 6: Start the Apply and Capture processes

Now start the apply and the capture processes and wait for them to keep up with the current position of the log writer:
exec dbms_apply_adm.start_apply('SQLGEN_APPLY')
exec dbms_capture_adm.start_capture('SQLGEN_CAPTURE')

Step 7: Test the SQL Generation

You can test your settings by inserting or updating a row in the source table; After some time, query the table that stores the generated SQL:
insert into source.t5 values (3,'Hello','Hello');
commit;

-- Wait a few minutes
col id format 99
col sqltext format a50
set long 1000
set longchunksize 1000
select * from mydml_tab;
The table content should look like below:
 ID SQLTEXT
--- --------------------------------------------------
1 /* COMMENT */ INSERT INTO "SOURCE"."T5"("ID","TEX
T1","TEXT2" ) VALUES ( 3,'Hello','Hello')
You can perform more tests with UPDATE or DELETE

Step 8: Drop the test environment

Like always with my example, I propose you drop the whole configuration before you leave:
connect / as sysdba

exec dbms_apply_adm.stop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.stop_capture('SQLGEN_CAPTURE')

exec dbms_apply_adm.delete_all_errors('SQLGEN_APPLY');
exec dbms_apply_adm.drop_apply('SQLGEN_APPLY')
exec dbms_capture_adm.drop_capture('SQLGEN_CAPTURE')

begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/

begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user source cascade;

Read more...