Friday, October 9, 2009

Streams LCRs and LOBs

LOBs (BLOB, CLOB, NCLOB) have been designed to provide a rich and efficient set of APIs. With securefile, LOB's storage model is highly efficient and optimized for compression. LOBs don't work like anything else within an Oracle database. You can check my previous post entitled Triggers and LOBs, if you are not convinced yet.

As a result, Streams doesn't work with LOBs like anything else and using LOBs with Streams can quickly turn into a headache:
  • Many tools, including the ones provided by the documentation, don't provide a clean display of LCR with LOBs,
  • Some packages require specific LOB settings, like DBMS_APPLY_ADM.SET_DML_HANDLER and its assemble_lobs parameter,
  • The streams feature set associated with LOB is quite different from the feature set associated with other types : no conflict detection, no support for synchronous capture mechanisms and several specific bugs (search for "Streams" and "LOB" in the Metalink Bug Search Engine).
For all these reasons, it's very interesting to be able to figure out how LOBs and Streams work together. To help, I've built a very simple model that allows to see, if not exactly the content of LCRs containing LOBs at least a XML representation of them.

The test case

You'll find below a description of the test case, I've built:

To work the test case is made of several parts
  1. The source table is named SOURCE.T9. You can easily change its structures (from CLOB to BLOB for example). All the DML commands you'll execute and commit on that table will be captured.
  2. Because I use an asynchronous capture process, I need to make sure the change vectors are stored by LGWR in the redologs and archivelogs.
  3. I've created a strmadmin schema to stored all the objects I use for the setup (queue, procedure, rules...); I've named the capture process streams_capture; it captures the changes from the redologs and enqueue them in the streams_queue queue.
  4. The apply process de-queues the LCRs from streams_queue
  5. It relies on a DML handler to manage the LCRs. That handler converts the LCRs into XML type objects with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure and enqueue them in the myqueue queue
  6. The print_xml_fromq procedure is used to display the content of the XML type objects that match LCRs.

The script

The script below performs the whole setup of the test case described above:
connect / as sysdba

col global_name format a30 new_value dbname
select global_name from global_name;

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

create table source.t9(
id number,
text clob,
constraint t9_pk primary key (id))
lob(text) store as securefile;

var first_scn number;

set serveroutput on
declare
scn number;
begin
dbms_capture_adm.build(
first_scn => scn);
dbms_output.put_line('First SCN Value = ' || scn);
:first_scn := scn;
end;
/

col first_scn format 999999999999 -
new_value first_scn
select :first_scn first_scn from dual;

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf'
size 25M autoextend on maxsize 256M;

create user strmadmin identified by strmadmin
default tablespace streams_tbs
quota unlimited on streams_tbs;

grant dba to strmadmin;
grant execute on dbms_aq to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

connect strmadmin/strmadmin

var first_scn number;
exec :first_scn:=&&first_scn

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t9',
streams_type => 'capture',
streams_name => 'streams_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&&dbname',
inclusion_rule => true,
and_condition => null);
end;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

set lines 120
col streams_name format a16
col streams_type format a9
col rule_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15

select rule_owner,
streams_name,
streams_type,
table_owner||'.'||table_name table_name,
rule_type,
rule_name
from dba_streams_table_rules;

select table_owner||'.'||table_name table_name,
scn,
timestamp,
supplemental_log_data_pk
from dba_capture_prepared_tables;

connect strmadmin/strmadmin

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t9',
streams_type => 'apply',
streams_name => 'streams_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => '&&dbname',
inclusion_rule => true);
end;
/

col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

col instantiation_scn format 999999999999 -
new_value instantiation_scn

select dbms_flashback.get_system_change_number instantiation_scn
from dual;

begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t9',
source_database_name => 'BLACK',
instantiation_scn => &&instantiation_scn);
end;
/

col source_database format a6
col object format a10
col instantiation_scn format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

connect strmadmin/strmadmin

begin
dbms_aqadm.create_queue_table(
queue_table => 'myqueue_table',
queue_payload_type => 'sys.xmltype',
multiple_consumers => false);
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
dbms_aqadm.start_queue('myqueue');
end;
/

create or replace procedure myhandler(in_any in anydata)
is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.persistent;
dbms_aq.enqueue(
queue_name => 'STRMADMIN.MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => dbms_streams.convert_lcr_to_xml(in_any),
msgid => message_handle);
end;
/

begin
dbms_apply_adm.set_dml_handler(
object_name => 'SOURCE.T9',
object_type => 'TABLE',
operation_name => 'DEFAULT',
error_handler => false,
user_procedure => 'strmadmin.myhandler',
apply_name => 'STREAMS_APPLY',
assemble_lobs => false);
end;
/

exec dbms_capture_adm.start_capture('streams_capture');
exec dbms_apply_adm.start_apply('streams_apply');

create or replace procedure print_xml_fromq(queue_name varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message sys.xmltype;
v_out pls_integer;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.persistent;
loop
begin
dbms_aq.dequeue(
queue_name => queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line(message.extract('/*').getstringval());
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
If you are use to streams, there is not much that you don't know already in that script. The table contains a LOB but can easily be changed, even after you've started the capture to match you need. The very powerful feature I use and that comes with Streams is the ability to transform a LCR into a XML File with the DBMS_STREAMS.CONVERT_LCR_TO_XML procedure.

Play with LOBs

Once the configuration done, you can use implicit LOB conversion or the LOB built-ins to create and modify LOB values from SQL and PL/SQL like below:
connect source/source

insert into source.T9 values (1,empty_clob());
commit;

declare
my_x clob;
begin
select text into my_x
from source.T9 where id=1
for update;
dbms_lob.append(my_x,'Z');
commit;
end;
/

declare
my_x clob;
amount number;
begin
select text into my_x
from source.T9 where id=1
for update;
amount:=1;
dbms_lob.erase(my_x,amount,1);
commit;
end;
/

Display XMLized LCRs

You can now display the content of the LCRs that have been captured with the script below:
connect strmadmin/strmadmin
set lines 1000
set serveroutput on
exec print_xml_fromq('MYQUEUE');
The result looks like below. Pay attention to the various command type "LOB WRITE", "LOB TRIM" and "LOB ERASE":
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>INSERT</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>6.31.810</transaction_id>
<scn>1351988</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>EMPTY LOB</lob_information>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>UPDATE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>6.31.810</transaction_id>
<scn>1351988</scn>
<old_values>
<old_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</old_value>
</old_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB WRITE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>3.33.828</transaction_id>
<scn>1352031</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2>Z</varchar2>
</data>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB TRIM</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>3.33.828</transaction_id>
<scn>1352031</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_operation_size>1</lob_operation_size>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
---------------------------------------------------------
<row_lcr xmlns="http://xmlns.oracle.com/streams/schemas/lcr"
xsi="http://www.w3.org/2001/XMLSchema-instance"
schemalocation="http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd">
<source_database_name>BLACK</source_database_name>
<command_type>LOB ERASE</command_type>
<object_owner>SOURCE</object_owner>
<object_name>T9</object_name>
<transaction_id>7.19.625</transaction_id>
<scn>1352034</scn>
<new_values>
<new_value>
<column_name>ID</column_name>
<data>
<number>1</number>
</data>
</new_value>
<new_value>
<column_name>TEXT</column_name>
<data>
<varchar2 nil="true"></varchar2>
<lob_information>LAST LOB CHUNK</lob_information>
<lob_offset>1</lob_offset>
<lob_operation_size>1</lob_operation_size>
</data>
</new_value>
</new_values>
</row_lcr>
---------------------------------------------------------
No more messages

Drop the test case

Once your tests done, you can drop the whole test case running the script below:
connect / as sysdba

col global_name format a30 new_value dbname
select global_name from global_name;

exec dbms_apply_adm.stop_apply('STREAMS_APPLY')
exec dbms_capture_adm.stop_capture('STREAMS_CAPTURE')

exec dbms_apply_adm.delete_all_errors('STREAMS_APPLY');
exec dbms_apply_adm.drop_apply('STREAMS_APPLY')
exec dbms_capture_adm.drop_capture('STREAMS_CAPTURE')

drop procedure strmadmin.mydml_handler;

begin
dbms_aqadm.stop_queue('strmadmin.myqueue');
dbms_aqadm.drop_queue('strmadmin.myqueue');
dbms_aqadm.drop_queue_table('strmadmin.myqueue_table', true);
end;
/

begin
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => '&&dbname',
instantiation_scn => null);
end loop;
end;
/

begin
for i in (select object_owner||'.'||
object_name name,
operation_name,
apply_name
from dba_apply_dml_handlers
where object_owner in ('SOURCE'))
loop
dbms_apply_adm.set_dml_handler(
object_name => i.name,
object_type => 'TABLE',
operation_name=> i.operation_name,
user_procedure=> null,
apply_name => i.apply_name);
end loop;
end;
/

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user source cascade;
In a next blog post, I'll dig further into how LOBs work within LCRs. If you want write your own LCRs that contains LOB, you can check the documentation; there is a short section called "Oracle® Streams Extended Examples - Logical Change Records With LOBs Example"

No comments:

Post a Comment

Post a Comment