Bookmark and Share

Tuesday, January 20, 2009

Tracking Updates And Deletes With a Custom Rule-Based Transformation

As you know, Streams can be used in several cases, including some really advanced ones where you may want to transform DDL or DML changes into something very different. This post starts from the previous Streams One Way Table Replication 101 post. Instead of maintening a copy of the table in a destination schema, we'll transform DML captured changes and we'll keep the data from before the changes; we'll use a Custom Rule-Based Transformation for that purpose.

I won't argue if this is or not the right way to manage your problem; it's probably not, by the way! The goal here is to illustrate a specific point of the documentation that is described in Managing Custom Rule-Based Transformations.

First, let's represent what we want to achieve; Basically that's what follows:

Custom Transformation Example

We want to populate a table named T3_HISTORY from the changes made to the T3 table, so that:

  • If one row is inserted in T3, nothing is applied to T3_HISTORY
  • If one or several columns are modified in a row of T3, a row is inserted in T3_HISTORY with the primary key of the changed row and the values of the columns before they've been modified
  • If one row is deleted from T3, it is inserted in T3_HISTORY
Pretty simple? To setup that example, we'll perform the steps below:
Note:
We've tested this post on a 11.1.0.7 Enterprise Edition version of Oracle running on Linux 32bits.

Step 1: Build The Sample Schema and Table

In order to start, we'll create a schema named SOURCE and a table named T3. The script below can be used for that simple operation:

connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

connect source/source

create table t3(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t3(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t3(id, text1, text2)
values (2,'Text 2','Text 2');

commit;
Step 2: Configure Database Logging and the Multi-Version Data Dictionary

Then, we'll enable the supplemental logging and we'll capture dictionary for our apply process; for a more detailed explanation, refer to the corresponding section of the previous post:
connect / as sysdba

alter database add supplemental log
data (primary key, unique index) columns;

select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;

SUPPLEME SUP SUP
-------- --- ---
IMPLICIT YES YES

connect / as sysdba

var first_scn number;

set serveroutput on

DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/

First SCN Value = 49042939541
exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'source.t3');

Step 3: Create the Streams Administrator and The Streams Queue

We'll create the Streams Administrator and the Streams Queue; for a more detailed explanation, refer to the corresponding section of the previous post:
connect / as sysdba

CREATE TABLESPACE streams_tbs
DATAFILE '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
SIZE 25M AUTOEXTEND ON MAXSIZE 256M;

CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;

grant dba to strmadmin;

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/

Step 4: Create the Capture Process and Define the Capture Rules

Once the source and the destination databases setup, you can create a capture process and add it a rule to capture the source table; In this case we are not interested by the INSERT commands but instead, just by DELETE and UPDATE. For that reason we'll use a AND conditition that will be added to the rule evaluation. For a more detailed explanation about the capture process, you can refer to the corresponding section of the previous post:
connect strmadmin/strmadmin

var first_scn number;

exec :first_scn:= 49042939541

BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'capture4transfo',
rule_set_name => NULL,
source_database => 'BLACK',
use_database_link => false,
first_scn => :first_scn,
logfile_assignment => 'implicit');
END;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

CAPTURE_NAME QUEUE_NAME FIRST_SCN START_SCN RULE_SET_NA
--------------- ------------- ------------- ------------- -----------
CAPTURE4TRANSFO STREAMS_QUEUE 49042939541 49042939541

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t3',
streams_type => 'capture',
streams_name => 'capture4transfo',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true,
and_condition =>
':lcr.get_command_type() in (''DELETE'',''UPDATE'')');
END;
/

set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;

STREAMS_NAME STREAMS_T TABLE_OWNE TABLE_NAME RULE_TYP RULE_NAME
---------------- --------- ---------- ---------- -------- ---------
CAPTURE4TRANSFO CAPTURE SOURCE T3 DML T352

col rule_condition format a60 wor wra
select rule_condition
from dba_streams_table_rules
where streams_name='CAPTURE4TRANSFO'
and table_owner='SOURCE'
and table_name='T3';

RULE_CONDITION
------------------------------------------------------------
(((((:dml.get_object_owner() = 'SOURCE' and :dml.get_object_
name() = 'T3')) and :dml.get_source_database_name() = 'BLACK
' )) and (:dml.get_command_type() in ('DELETE','UPDATE')))

Step 5: Create a Declarative Rule-Based Transformation and Add it to the Capture Process

We want the transformation to be as resuable as possible. For that reason we'll proceed in 2 step. The first step consists in renaming the table from SOURCE.T3 to DESTINATION.T3_HISTORY with de declarative rule based transformation. We'll apply the transformation to the capture rule and the script below details the associated steps:
connect strmadmin/strmadmin

begin
dbms_streams_adm.rename_table(
rule_name => 'T352' ,
from_table_name => 'SOURCE.T3',
to_table_name => 'DESTINATION.T3_HISTORY',
step_number => 0,
operation => 'add');
end;
/

col rule_name format A6
col from_schema_name format a6
col to_schema_name format a12

select rule_name,
transform_type,
from_schema_name,
to_schema_name,
declarative_type
from dba_streams_transformations;

RULE_N TRANSFORM_TYPE FROM_S TO_SCHEMA_NA DECLARATIVE_T
------ -------------------------- ------ ------------ -------------
T352 DECLARATIVE TRANSFORMATION SOURCE DESTINATION RENAME TABLE

Step 6: Create an Apply Process

We can create an apply process that will subscribe the Streams queue and get the changes to be applied on the DESTINATION.T3_HISTORY table:
connect strmadmin/strmadmin

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'destination.t3_history',
streams_type => 'apply',
streams_name => 'apply4transfo',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true);
END;
/

col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

APPLY_NAME QUEUE_NAME RULE_SET_NA STATUS MESSAGE_DE
------------- ------------- ----------- -------- ----------
APPLY4TRANSFO STREAMS_QUEUE RULESET$_55 DISABLED CAPTURED

set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a11
col table_name format a15
col rule_type format a8
col rule_name format a15

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where STREAMS_NAME='APPLY4TRANSFO';

STREAMS_NAME STREAMS_T TABLE_OWNER TABLE_NAME RULE_TYP RULE_NAME
---------------- --------- ----------- ---------- -------- ------------
APPLY4TRANSFO APPLY DESTINATION T3_HISTORY DML T3_HISTORY54

Step 7: Create a Custom Rule-Based Transformation and Add it to the Apply Process Rule

As you can easily understand the transformation we'll apply to the LCR is not simple enough to be executed declaratively. For that reason, we'll create a custom function that will take the LCR as a parameter and will return the transformed LCR. You'll find the code of it below; it consists in the following:
  • Make sure the LCR is an 'UPDATE' or a 'DELETE'
  • Get all the before images (i.e. 'old') of the changed and PK/UK columns and set them as the values to be inserted in the LCR (i.e. 'new')
  • Add a column SCN that contains the SCN of the DML statement
  • Add a column DML_OPERATION that says if the capture operation was an UPDATE or a DELETE
  • Delete all the before images (i.e. 'old') of the LCR
  • Transform the command into an INSERT
You'll find the code of the procedure below:
connect strmadmin/strmadmin

create or replace function transform4history(in_any in anydata)
return anydata
IS
lcr sys.lcr$_row_record;
lcr_rl sys.lcr$_row_list;
rc number;
invalid_column exception;
pragma exception_init(invalid_column,-23607);
begin
-- Get the type of object
-- Check if the object type is SYS.LCR$_ROW_RECORD
if in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
-- Put the row LCR into lcr
rc := in_any.GETOBJECT(lcr);
-- Check if that's an Update or a Delete
if (lcr.GET_COMMAND_TYPE() in ('UPDATE','DELETE')) then
lcr_rl:=lcr.get_values('old');
for i in lcr_rl.first..lcr_rl.last loop
begin
lcr.set_value('new',lcr_rl(i).column_name,
lcr.get_value('old',lcr_rl(i).column_name));
exception when invalid_column then
lcr.add_column('new',lcr_rl(i).column_name,
lcr.get_value('old',lcr_rl(i).column_name));
end;
end loop;
lcr.add_column('new','SCN',
anydata.convertnumber(lcr.get_scn()));
lcr.add_column('new','DML_OPERATION',
anydata.convertvarchar2(lcr.get_command_type()));
for i in lcr_rl.first..lcr_rl.last loop
lcr.delete_column(lcr_rl(i).column_name,'old');
end loop;
lcr.set_command_type('INSERT'); -- Change it to an insert
return anydata.convertobject(lcr);
end if;
return in_any;
end if;
end;
/

You can add the transformation to the apply rule that matches the destination table. It order to do it, use DBMS_STREAMS_ADM.SET_RULE_TRANSFORM_FUNCTION like below:
connect strmadmin/strmadmin

begin
dbms_streams_adm.set_rule_transform_function(
rule_name => 'T3_HISTORY54',
transform_function => 'transform4history');
end;
/

col rule_name format A12
col user_function_name format a32

select rule_name,
transform_type,
user_function_name
from dba_streams_transformations;

RULE_NAME TRANSFORM_TYPE USER_FUNCTION_NAME
------------ -------------------------- --------------------------------
T352 DECLARATIVE TRANSFORMATION
T3_HISTORY54 CUSTOM TRANSFORMATION "STRMADMIN"."TRANSFORM4HISTORY"
Step 8: Instantiate the Table in the DESTINATION Schema

Streams is now configured. Before you start the different processes, you must instantiate the table in the DESTINATION schema and store the instantiation SCN so that the apply process knows what changes exactly it can apply. In our test case, we'll start with an empty table that will be slighly different from the original table:
  • We'll add a column SCN to store the SCN of the change
  • We'll add a column DML_OPERATION to store the DML command used on the primary table
  • It won't have any Primary Key; even though we could create one on (ID, SCN)
The associated SQL is below:
connect / as sysdba

create user destination
identified by destination
default tablespace users
temporary tablespace temp;

grant connect,resource to destination;

create table destination.t3_history(
id number,
text1 varchar2(80),
text2 varchar2(80),
scn number,
dml_operation varchar2(6));
col apply_scn format 999999999999

select dbms_flashback.get_system_change_number apply_scn
from dual;

APPLY_SCN
-----------
49042945957
Once the table created, use the dbms_apply_adm.set_table_instantiation_scn procedure to store the instantiation SCN for the apply process like below:
begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t3',
source_database_name => 'BLACK',
instantiation_scn => 49042945957);
end;
/

col SOURCE_DATABASE format a6
col OBJECT format a10
col INSTANTIATION_SCN format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

SOURCE OBJECT INSTANTIATION_SCN
------ ---------- -----------------
BLACK SOURCE.T3 49042945957
Step 9: Start the Capture and Apply Processes

You can start the capture and the apply processes :
exec dbms_capture_adm.start_capture('capture4transfo');
exec dbms_apply_adm.start_apply('apply4transfo');
Step 10: Test the Table Replication

To test the replication, update the source table and check the changes are propagated to the destination table. Below is a set of simple tests that shows the replication is working as it should:
update source.t3 set text1= 'Text 3'
where id=1;

delete from source.t3
where id=1;

commit;

pause

col id format 99
col text1 format a6
col text2 format a6
col scn format 999999999999

select id,
text1,
text2,
scn,
dml_operation
from destination.t3_history
order by id, scn;

ID TEXT1 TEXT2 SCN DML_OP
--- ------ ------ ------------- ------
1 Text 1 49042951081 UPDATE
1 Text 3 Text 1 49042965443 DELETE
Step 11: Stop and Suppress the Oracle Streams Configuration

In this last step, we'll clean up the environment and leave it as it was when we've started. In order to do it it, we'll stop and drop the capture and apply processes; we'll drop the Streams queue, the Streams administrator and the associated tablespace; we'll drop the 2 schemes and we'll disabled the supplemental logging:
connect / as sysdba

exec dbms_capture_adm.stop_capture('capture4transfo');
exec dbms_apply_adm.stop_apply('apply4transfo');

exec dbms_capture_adm.drop_capture('capture4transfo',true);
exec dbms_apply_adm.drop_apply('apply4transfo',true);

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

exec dbms_apply_adm.SET_TABLE_INSTANTIATION_SCN('source.t3','BLACK', null)

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user destination cascade;
drop user source cascade;

alter database drop supplemental log
data (primary key, unique index) columns;

select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;

SUPPLEME SUP SUP
-------- --- ---
NO NO NO

Read more...

Tuesday, January 13, 2009

Streams Synchronous Capture 101

Streams Synchronous Capture is a New Feature of Oracle 11g; It doesn't rely on archive logs or redo logs to capture changes. Instead it captures changes when they are made. It uses a row level trigger-based mechanism to perform this task. As its name suggests, the only part that is "synchronous" is the capture; the changes continue to be applied asynchronously to the destination objects and the databases stay loosely coupled. As a matter of fact, propagation and apply processes don't change so much.

In addition to the capture itself, the main change associated with synchronous capture is the queue usage. Messages are persistent/commit time instead of buffered. The reason is that the capture process won't get any second chance to capture a change and, as consequence, it cannot afford loosing anything in a queue. As a result, every change of a synchronous capture is logged in all the databases it transit through and the fingerprint can be quite significant. It's worth to notice too, that synchronous capture can only propagate DML changes, it only works with a small subset of datatypes that doesn't include LOBs, and the capture process have to run on the source database. However, synchronous capture has its own set of advantages too:
  • It can capture changes on a Standard Edition or a Standard Edition One 11g database
  • It can be used even if the source database is running in noarchivelog mode and doesn't require you add any supplemental logging
  • It's probably simpler to setup, though I feel like a regular Streams setup is pretty simple too
  • It's a new feature of 11.1 ;-)
In this post, we'll reproduce the same setup we've built in our previous Oracle Streams One Way Table Replication 101 post, with synchronous capture this time; in order to do it, we'll go through the steps below:
Step 1: Build The Sample Schema and Table

In order to start, we'll create a schema named SOURCE and a table named T2 the same way we've done it in the previous post. You'll find below the script to perform the associated operations:
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

connect source/source

create table t2(
id number primary key,
text varchar2(80));

insert into t2(id, text)
values (1,'Text 1');

insert into t2(id, text)
values (2,'Text 2');

commit;

Step 2: Create the Streams Administrator and Queue

The database doesn't require any specific settings. It doesn't rely either on the Multi Version Data Dictionary. We'll create a Streams Administrator in all the databases that are involved with the capture, propagation or apply processes; the script below does it. It creates a tablespace to store the queues. It creates a user STRMADMIN, grants it the privileges and roles as well as it creates the queue that will be used by the capture and the apply processes to share the LCR:
connect / as sysdba

CREATE TABLESPACE streams_tbs
DATAFILE SIZE 25M
AUTOEXTEND ON MAXSIZE 256M;

CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;

grant dba to strmadmin;

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/
Step 3: Create an Apply Process
IMPORTANT :
Opposite to the asynchronous capture, synchronous capture is always running. You cannot start it, and more importantly you cannot stop it! The reason is that it has only one chance to capture changes and you could miss changes if you had that ability. A consequence of that and of the fact a running capture process must have one apply process is that you MUST create the apply process before you create the capture process. If you don't, you may encounter an error like the one below when you try to change your source tables:
insert into source.t1 values (5,'Text 5')
*
ERROR at line 1:
ORA-26694: error while enqueueing into queue STRMADMIN.STREAMS_QUEUE
ORA-24033: no recipients for message
Before rush on the creation of the apply process, one important point to understand:
  • You'll want the consumed messages to be the ones captured by a synchronous capture and NOT by an asynchronous capture. That means that you want the apply process to subscribe to the persistent message part of the queue and NOT to the buffered part of it. The way you specify that is to set the applied_captured parameter to FALSE. That's for the subtle part: you should name that parameter "applied asynchronous captured" for yourself ;-). And here comes the confusing part, the 11.1 documentation for dbms_apply_adm.create_capture suggests that FALSE is the default value. That's not the case; the default is TRUE and that parameter has to be specified in the case of a synchronous capture.
The script below creates an apply process and configures it so that it subscribes to the strmadmin.streams_queue queue:
connect strmadmin/strmadmin

begin
dbms_apply_adm.create_apply(
queue_name => 'strmadmin.streams_queue',
apply_name => 'sync_apply',
apply_captured => false,
source_database => 'BLACK');
end;
/

begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t2',
streams_type => 'apply',
streams_name => 'sync_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'BLACK',
inclusion_rule => true);
end;
/


set lines 120
col streams_name format a12
col streams_type format a12
col table_owner format a11
col table_name format a5
col rule_type format a8
col rule_name format a5

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;

STREAMS_NAME STREAMS_TYPE TABLE_OWNER TABLE RULE_TYP RULE
------------ ------------ ----------- ----- -------- ----
SYNC_APPLY APPLY DESTINATION T2 DML T226
Step 4: Create a Capture Process

You can now create the synchronous capture process. You can simply use DBMS_STREAMS_ADM.ADD_TABLE_RULES with streams_type=>'sync_capture' like below:
connect strmadmin/strmadmin

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t2',
streams_type => 'sync_capture',
streams_name => 'sync_capture',
queue_name => 'strmadmin.streams_queue');
END;
/

set lines 120
col streams_name format a12
col streams_type format a12
col table_owner format a11
col table_name format a5
col rule_type format a8
col rule_name format a4

select STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;

STREAMS_NAME STREAMS_TYPE TABLE_OWNER TABLE RULE_TYP RULE
------------ ------------ ----------- ----- -------- ----
SYNC_CAPTURE SYNC_CAPTURE SOURCE T2 DML T228
SYNC_APPLY APPLY SOURCE T2 DML T226
Step 5: Add a Transformation Rule to the Table Apply Rule

Add the Transformation associated with the dml apply rule so that the schema name is transformed from SOURCE to DESTINATION. The script below adds the transformation to the apply. It is the same as the one for asynchronous capture:
begin
dbms_streams_adm.rename_schema(
rule_name => 'T226' ,
from_schema_name => 'SOURCE',
to_schema_name => 'DESTINATION',
step_number => 0,
operation => 'add');
end;
/

col rule_name format A6
col from_schema_name format a6
col to_schema_name format a12

select rule_name,
transform_type,
from_schema_name,
to_schema_name,
declarative_type
from dba_streams_transformations;

RULE_N TRANSFORM_TYPE FROM_S TO_SCHEMA_NA DECLARATIVE_T
------ -------------------------- ------ ------------ -------------
T226 DECLARATIVE TRANSFORMATION SOURCE DESTINATION RENAME SCHEMA

Step 6: Instantiate the Table in the DESTINATION Schema

Streams is now configured. Before you start the different processes, you must instantiate the table in the DESTINATION schema and store the instantiation SCN so that the apply process knows what changes exactly it can apply. To instantiate the table, you can use the method of your choice: RMAN, DataPump, exp or Flashback Query. Here is an example with Flashback Query:
connect / as sysdba

create user destination
identified by destination
default tablespace users
temporary tablespace temp;

grant connect,resource to destination;

create table destination.t2(
id number primary key,
text varchar2(80));
col apply_scn format 999999999999 new_value apply_scn

select dbms_flashback.get_system_change_number apply_scn
from dual;

APPLY_SCN
-----------
49042368875

insert into destination.t2
(select *
from source.t2 as of scn &&apply_scn);

commit;
Once the table instantiated, use the dbms_apply_adm.set_table_instantiation_scn procedure to store the instantiation SCN for the apply process like below:
begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t2',
source_database_name => 'BLACK',
instantiation_scn => &&apply_scn);
end;
/

col SOURCE_DATABASE format a6
col OBJECT format a10
col INSTANTIATION_SCN format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

SOURCE OBJECT INSTANTIATION_SCN
------ ---------- -----------------
BLACK SOURCE.T2 49042368875

Step 7: Start the Apply Processes

You can finally start the capture and the apply processes :
exec dbms_apply_adm.start_apply('sync_apply');
Step 8: Test/Validate the Replication

To test the replication, update the source table and check the changes are propagated to the destination table. Below is a simple test that shows the replication is working as it should:
insert into source.t2(id, text)
values (3,'Text 3');

commit;

pause

col id format 99
col text format a6

select id,
text
from destination.t2;

ID TEXT
-- ------
3 Text 3
1 Text 1
2 Text 2

Step 9: Stop and Suppress the Oracle Streams Configuration

In this last step, we'll clean up the environment and leave it as it was when we've started. In order to do it it, we'll stop and drop the capture and apply processes; we'll drop the Streams queue, the Streams administrator and the associated tablespace. We'll also drop the 2 schemes:
connect / as sysdba

exec dbms_apply_adm.stop_apply('sync_apply');

exec dbms_capture_adm.drop_capture('sync_capture',true);
exec dbms_apply_adm.drop_apply('sync_apply',true);

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user destination cascade;
drop user source cascade;

begin
for i in (select source_schema name,
source_database
from dba_apply_instantiated_schemas
where source_schema in ('SOURCE','DESTINATION'))
loop
dbms_apply_adm.set_schema_instantiation_scn(
source_schema_name => i.name,
source_database_name => i.source_database,
instantiation_scn => null);
end loop;
for i in (select source_object_owner||'.'||
source_object_name name,
source_database
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE','DESTINATION'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => i.source_database,
instantiation_scn => null);
end loop;
end;
/

Read more...

Sunday, January 11, 2009

Oracle Streams One Way Table Replication 101

If you're looking to jump start Oracle Streams, this is probably the right post: a quick setup, a lot of shortcuts, one only database and probably the simplest configuration you will ever see. Obviously, that doesn't address everything; you'll always need to deep dive into the concepts and principals of Streams. But you should be able to get a running Streams configuration in less than 30 minutes and use it as a basis to build more complex test cases. Hopefully that will be as useful to you as it is to me.

In this example, we'll maintain a copy of a table named T1 located in the SOURCE schema, in the DESTINATION schema of the same database. In order to get to that result, we'll dig into the details of the steps below:

Note:
We've tested this post on a 11.1.0.7 and on a 10.2.0.4 Enterprise Edition version of Oracle running on Linux 32bits.

Step 1: Build The Sample Schema and Table

In order to start, we'll create a schema named SOURCE and a table named T1. You'll find below the script to perform the associated operations:

connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

connect source/source

create table t1(
id number primary key,
text varchar2(80));

insert into t1(id, text)
values (1,'Text 1');

insert into t1(id, text)
values (2,'Text 2');

commit;
Step 2: Configure Database Logging and the Multi-Version Data Dictionary
Streams capture processes use the content of the redologs/archivelogs to build Logical Change Records (LCR) that are propagated to the destination database via buffered queues. Once propagated, the LCRs are applied to the destination database by the apply processes that transform them into a set of SQL commands. As you know the redo logs don't contain SQL but binary data. Those data are built to be used by a recovery process and, by default, don't contain the necessary data to be applied on a different database. Here are some of the consequences of using the redo logs to capture changes made on a source database:
  • The redologs don't contain the SQL commands that have been run on the database; instead, they contain the rowids of the changed rows with the before and after image of those changes. So if you update, say 1 column for all the rows of a table, in one update command, Streams will NOT replicate that command. Instead, capture processes generate one LCR per changed row. Each LCR includes the before and after value of the changes. Saying it in an other way, one update on the source database may lead to millions LCRs and updates on the destination database.
  • RowIds are used to uniquely identified changed rows and recover the archivelogs and redologs. But RowIds of rows differ on a destination database that is not a physical copy of the source database! To uniquely identify changed rows on a destination database, we must add to the LCR the value of a Unique or a Primary Key with every change. To add them to the LCR we must add them to the redo logs; we'll use supplemental logging for that purpose!
  • The schemes and names of the objects that are changed are not stored in the redo logs either; Instead, what is stored is the object identifiers. Those identifiers are stored in the LCRs and you need a copy of the dictionary from the source database on the destination database to "reverse engineer" LCRs into SQL. That copy of the dictionary is part of the Multi-Version Data Dictionary (MVDD). It needs to be captured from the source database and sent to the destination database.
For the reasons above, we need to prepare the source database to be compatible with Streams Capture.

To uniquely identified rows on the destination database, you need the database to be in ARCHIVELOG mode. Add supplemental logging on the source database and log the unique and primary keys of all the changed rows. That can be done with the atomicity of one object but, as discussed earlier, this post takes some shortcuts and we'll add the supplemental logging at the database level. Below is the script to perform that operation:
connect / as sysdba

alter database add supplemental log
data (primary key, unique index) columns;

select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;

SUPPLEME SUP SUP
-------- --- ---
IMPLICIT YES YES
To push a copy of the data dictionary from the source to the destination database, we need to run DBMS_CAPTURE_ADM.BUILD as below:
connect / as sysdba

var first_scn number;

set serveroutput on

DECLARE
scn NUMBER;
BEGIN
DBMS_CAPTURE_ADM.BUILD(
first_scn => scn);
DBMS_OUTPUT.PUT_LINE('First SCN Value = ' || scn);
:first_scn := scn;
END;
/

First SCN Value = 49042254018
It is important to keep track of the SCN returned by DBMS_CAPTURE_ADM.BUILD. You'll use that SCN as the first SCN of the capture processes so that the meta data from the source are pushed and kept in the destination MVDD. Note that the last command captures the minimum to add to the MVDD and you have to add the meta data of all the objects you want to replicate. In order to do that, you must use one of the prepare_xxxx_instantiation procedure of the dbms_capture_adm package as below:
exec dbms_capture_adm.prepare_table_instantiation(-
table_name=>'source.t1');

Step 3: Create the Streams Administrator and The Streams Queue

To use Oracle Streams, you must create a Streams Administrator in all the databases that directly involved with a capture, propagation or apply process; the script below does it. It creates a tablespace to store the queues: it is require to have a separate tablespace for queues. Then it creates a user STRMADMIN, grants it the privileges and roles as well as the buffered queue with will be used by the capture and the apply processes to share the LCR:
connect / as sysdba

CREATE TABLESPACE streams_tbs
DATAFILE '/u01/app/oracle/oradata/BLACK/streams_tbs.dbf' size 25M
AUTOEXTEND ON MAXSIZE 256M;

CREATE USER strmadmin IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs;

grant dba to strmadmin;

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
END;
/

Step 4: Create the Capture Process and Define the Capture Rules
Once the source and the destination databases setup, you can create the capture process and add the source table to it. The first_scn parameter must match the one returned from DBMS_CAPTURE_ADM.BUILD; the source_database parameter must match the db_unique_name (or db_name, if db_unique_name is not used) of the source database; this is the script to create the capture process:
connect strmadmin/strmadmin

accept first_scn prompt "Enter the First SCN of the Capture: "
Enter the First SCN of the Capture: 49042254018

var first_scn number;
exec :first_scn:=&&first_scn

BEGIN
DBMS_CAPTURE_ADM.CREATE_CAPTURE(
queue_name => 'strmadmin.streams_queue',
capture_name => 'streams_capture',
rule_set_name => NULL,
source_database => 'BLACK',
use_database_link => false,
first_scn => :first_scn,
logfile_assignment => 'implicit');
END;
/

col capture_name format a15
col queue_name format a13
col first_scn format 999999999999
col start_scn format 999999999999
col rule_set_name format a11

select capture_name,
queue_name,
first_scn,
start_scn,
rule_set_name
from dba_capture;

CAPTURE_NAME QUEUE_NAME FIRST_SCN START_SCN RULE_SET_N
--------------- ------------- ------------- ------------- ----------
STREAMS_CAPTURE STREAMS_QUEUE 49042254018 49042254018 RULESET$_7

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t1',
streams_type => 'capture',
streams_name => 'streams_capture',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true);
END;
/

set lines 120
col streams_name format a16
col streams_type format a9
col table_owner format a10
col table_name format a15
col rule_type format a8
col rule_name format a15

select rule_owner,
STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES;

STREAMS_NAME STREAMS_T TABLE_OWNE TABLE_NAME RULE_TYP RULE_NAME
--------------- --------- ---------- ---------- -------- ---------
STREAMS_CAPTURE CAPTURE SOURCE T1 DML T16

Step 5: Create an Apply Process

You must create an apply process that will subscribe the Streams queue and apply the changes on the SOURCE.T1 table:
connect strmadmin/strmadmin

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES(
table_name => 'source.t1',
streams_type => 'apply',
streams_name => 'streams_apply',
queue_name => 'strmadmin.streams_queue',
include_dml => true,
include_ddl => false,
include_tagged_lcr => true,
source_database => 'BLACK',
inclusion_rule => true);
END;
/


col apply_name format a13
col queue_name format a13
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

APPLY_NAME QUEUE_NAME RULE_SET_NA STATUS MESSAGE_DE
------------- ------------- ----------- -------- ----------
STREAMS_APPLY STREAMS_QUEUE RULESET$_10 DISABLED CAPTURED

Step 6: Transform the SQL Captured so that changes on SOURCE are applied to DESTINATION

In this example the source and the destination databases are the same; as a result the source and target tables cannot have the same schema or name. What we'll do now is add a transformation rule so that the schema name is transformed from SOURCE to DESTINATION. The script below add the transformation to the apply:
connect strmadmin/strmadmin

col rule_name format a20 new_value rulename

select rule_owner,
STREAMS_NAME,
STREAMS_TYPE,
TABLE_OWNER,
TABLE_NAME,
RULE_TYPE,
RULE_NAME
from DBA_STREAMS_TABLE_RULES
where streams_name='STREAMS_APPLY'
and streams_type='APPLY'
and rule_type='DML';

prompt &&rulename
T16

begin
dbms_streams_adm.rename_schema(
rule_name => '&&rulename' ,
from_schema_name => 'SOURCE',
to_schema_name => 'DESTINATION',
step_number => 0,
operation => 'add');
end;
/

col rule_name format A6
col from_schema_name format a6
col to_schema_name format a12

select rule_name,
transform_type,
from_schema_name,
to_schema_name,
declarative_type
from dba_streams_transformations;

RULE_N TRANSFORM_TYPE FROM_S TO_SCHEMA_NA DECLARATIVE_T
------ -------------------------- ------ ------------ -------------
T16 DECLARATIVE TRANSFORMATION SOURCE DESTINATION RENAME SCHEMA

Step 7: Instantiate the Table in the DESTINATION Schema

Streams is now configured. Before you start the different processes, you must instantiate the table in the DESTINATION schema and store the instantiation SCN so that the apply process knows what changes exactly it can apply. To instantiate the table, you can use the method of your choice: RMAN, DataPump, exp or Flashback Query:
connect / as sysdba

create user destination
identified by destination
default tablespace users
temporary tablespace temp;

grant connect,resource to destination;

create table destination.t1(
id number primary key,
text varchar2(80));
col apply_scn format 999999999999 new_value instantiation_scn

select dbms_flashback.get_system_change_number apply_scn
from dual;

APPLY_SCN
-----------
49042261443

prompt Enter the Instantiation: &&instantiation_scn
Enter the Instantiation: 49042261443

insert into destination.t1
(select * from source.t1 as of scn &&instantiation_scn);

commit;
Once the table instantiated, use the dbms_apply_adm.set_table_instantiation_scn procedure to store the instantiation SCN for the apply process like below:
begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t1',
source_database_name => 'BLACK',
instantiation_scn => &&instantiation_scn);
end;
/

col SOURCE_DATABASE format a6
col OBJECT format a10
col INSTANTIATION_SCN format 999999999999

select source_database,
source_object_owner||'.'||source_object_name object,
instantiation_scn
from dba_apply_instantiated_objects;

SOURCE OBJECT INSTANTIATION_SCN
------ ---------- -----------------
BLACK SOURCE.T1 49042261443

Step 8: Start the Capture and Apply Processes

You can start the capture and the apply processes :
exec dbms_capture_adm.start_capture('streams_capture');
exec dbms_apply_adm.start_apply('streams_apply');
Step 9: Test the Table Replication

To test the replication, update the source table and check the changes are propagated to the destination table. Below is a simple test that shows the replication is working as it should:
insert into source.t1(id, text)
values (3,'Text 3');

commit;

pause

col id format 99
col text format a6

select id,
text
from destination.t1;

ID TEXT
--- ------
1 Text 1
2 Text 2
3 Text 3
I guess troubleshooting the Streams configuration is out of the scope of this post. However, you can query the fixed views and catalog views if you need to get some of details about the configuration and its state.

Step 10: Stop and Suppress the Oracle Streams Configuration

In this last step, we'll clean up the environment and leave it as it was when we've started. In order to do it it, we'll stop and drop the capture and apply processes; we'll drop the Streams queue, the Streams administrator and the associated tablespace; we'll drop the 2 schemas and we'll disabled the supplemental logging:
exec dbms_capture_adm.stop_capture('streams_capture');
exec dbms_apply_adm.stop_apply('streams_apply');

exec dbms_capture_adm.drop_capture('streams_capture',true);
exec dbms_apply_adm.drop_apply('streams_apply',true);

exec dbms_streams_adm.remove_queue('strmadmin.streams_queue',true,true);

drop user strmadmin cascade;

drop tablespace streams_tbs
including contents and datafiles;

drop user destination cascade;
drop user source cascade;

begin
for i in (select source_schema name
from dba_apply_instantiated_schemas
where source_schema in ('SOURCE','DESTINATION'))
loop
dbms_apply_adm.set_schema_instantiation_scn(
source_schema_name => i.name,
source_database_name => 'BLACK',
instantiation_scn => null);
end loop;
for i in (select source_object_owner||'.'||
source_object_name name
from dba_apply_instantiated_objects
where source_object_owner in ('SOURCE','DESTINATION'))
loop
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => i.name,
source_database_name => 'BLACK',
instantiation_scn => null);
end loop;
end;
/
alter database drop supplemental log
data (primary key, unique index) columns;

select SUPPLEMENTAL_LOG_DATA_MIN,
SUPPLEMENTAL_LOG_DATA_PK,
SUPPLEMENTAL_LOG_DATA_UI
from gv$database;

SUPPLEME SUP SUP
-------- --- ---
NO NO NO

Read more...

Sunday, January 4, 2009

Oracle AQ Buffered Queues 101

This post illustrates some of the features of Oracle Advanced Queuing Buffered Messages. For more details, check Oracle® Streams Advanced Queuing User's Guide, DBMS_AQADM, DBMS_AQ, AQ Types or wait for some of the coming posts of this blog ;-). Obviously, we'll be taking some shortcuts for now, but we'll explore basics of buffered messages, like:
  1. Create a user, tablespace and grant the privileges to use Advanced Queuing
  2. Create a queue table, a multiple consumer queue and start it
  3. Create subscribers for the queue
  4. Create a procedure to enqueue buffered messages
  5. Create a procedure to dequeue buffered messages
  6. Enqueue & dequeue messages
  7. Monitor queues and subscribers
  8. Illustrate how you can loose messages by crashing an instance
  9. Illustrate Flow Control and Spilled Message
  10. Clean up the environment
1. Create a user, tablespace and grant the privileges

First of all, we'll create a user, named AQDEMO, to own and use the message queues. In real life, you'll probably separate the queue owner from message publishers and consumers. However, to keep that demo short, we'll use AQDEMO to publish and consume messages too. It's worth to notice that a tablespace that contains queue tables doesn't support Tablespace Point In Time Recovery (TSPITR). For that reason and to ease the management of our AQ environment, we'll store the queues in a separate tablespace called AQ. Last but not least if you want to share queues across multiple databases, you should a use a distributed environment and set the global_names parameter of all the databases to true. The set of command to configure your database looks like below:
connect / as sysdba

alter system
set global_names=true;

select *
from global_name;

GLOBAL_NAME
-----------
BLACK

create tablespace aq
datafile '/u01/app/oracle/oradata/BLACK/aq01.dbf'
size 25M autoextend on maxsize 266M;

create user aqdemo
identified by aqdemo
default tablespace aq
temporary tablespace temp;

grant connect, resource to aqdemo;
grant execute on dbms_aqadm to aqdemo;
grant execute on dbms_aq to aqdemo;

2. Create a queue table, a multiple consumer queue and start it

To perform these steps, we'll connect as AQDEMO and will use:
  • dbms_aqadm.create_queue_table to create the queue table
  • dbms_aqadm.create_queue to create the queue
  • dbms_aqadm.start_queue to start the queue
To keep it simple, we'll won't use a user defined type for the message but just a RAW:
connect aqdemo/aqdemo

begin
dbms_aqadm.create_queue_table(
queue_table =>'myqueue_table',
queue_payload_type =>'RAW',
storage_clause =>'tablespace aq',
multiple_consumers => true,
comment =>'AQ Demo',
compatible=>'11.1');
end;
/

begin
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue(
queue_name => 'myqueue',
enqueue => true,
dequeue => true);
end;
/
Note:
If you plan to create a queue in another schema, you can prefix the queue table or queue name with the schema name like AQDEMO.MYQUEUE_TABLE.
If you want to mix cases, you can surround the name with double-quotes, like "AQDEMO"."myqueue_table" but that's probably easier to use uppercase.
Once the queue table created, several tables and views are used to store and query the associated data; here is an example with AQ$[QUEUE_TABLE]:
desc aq$myqueue_table

Name Null? Type
--------------------- -------- ----------------------------
QUEUE VARCHAR2(30)
MSG_ID NOT NULL RAW(16)
CORR_ID VARCHAR2(128)
MSG_PRIORITY NUMBER
MSG_STATE VARCHAR2(16)
DELAY DATE
DELAY_TIMESTAMP TIMESTAMP(6)
EXPIRATION NUMBER
ENQ_TIME DATE
ENQ_TIMESTAMP TIMESTAMP(6)
ENQ_USER_ID VARCHAR2(30)
ENQ_TXN_ID VARCHAR2(30)
DEQ_TIME DATE
DEQ_TIMESTAMP TIMESTAMP(9)
DEQ_USER_ID VARCHAR2(30)
DEQ_TXN_ID VARCHAR2(30)
RETRY_COUNT NUMBER
EXCEPTION_QUEUE_OWNER VARCHAR2(30)
EXCEPTION_QUEUE VARCHAR2(30)
USER_DATA BLOB
PROPAGATED_MSGID RAW(16)
SENDER_NAME VARCHAR2(30)
SENDER_ADDRESS VARCHAR2(1024)
SENDER_PROTOCOL NUMBER
ORIGINAL_MSGID RAW(16)
ORIGINAL_QUEUE_NAME VARCHAR2(30)
ORIGINAL_QUEUE_OWNER VARCHAR2(30)
EXPIRATION_REASON VARCHAR2(31)
CONSUMER_NAME VARCHAR2(30)
ADDRESS VARCHAR2(1024)
PROTOCOL NUMBER
3. Create subscribers for the queue

In the case of Publish/Subscribe, you have to create subscribers to dequeue messages. In our example, we'll register 2 subscribers called SUBSCRIBER1 and SUBSCRIBER2 for that purpose:
connect aqdemo/aqdemo

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber1',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,
delivery_mode => dbms_aqadm.buffered);
end;
/

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber2',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,
delivery_mode => dbms_aqadm.buffered);
end;
/
You can check queue subscribers by querying the AQ$MYQUEUE_TABLE_S view (or AQ$_MYQUEUE_TABLE_S table) like below:
col name    format a12
col address format a40

select name, address
from aq$myqueue_table_s
where queue='MYQUEUE';

NAME ADDRESS
------------ -------------------------
SUBSCRIBER1
SUBSCRIBER2
4. Create a procedure to enqueue buffered messages

The procedure below can be used to enqueue buffered messages; The options and properties used to enqueue the messages are the following:
  • The message is a RAW(10) passed from the procedure parameter as an hexadecimal string
  • The list of message recipients are SUBSCRIBER1 and SUBSCRIBER2
  • The message is a buffered message (dbms_aq.buffered)
connect aqdemo/aqdemo

create or replace procedure demo_enqueue(p_hexamsg varchar2) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
message RAW(10);
begin
message := hextoraw(p_hexamsg);
recipients(1) := sys.aq$_agent('SUBSCRIBER1', 'MYQUEUE', NULL);
recipients(2) := sys.aq$_agent('SUBSCRIBER2', 'MYQUEUE', NULL);
message_properties.recipient_list := recipients;
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
commit;
end;
/
Once the procedure created, you can easily enqueue messages from the AQDEMO user:
exec demo_enqueue('00000000000000000001');
5. Create a procedure to dequeue buffered messages

The procedure below dequeues the buffered messages and display it with DBMS_OUTPUT.PUT_LINE; it takes the subscriber name as a parameter
connect aqdemo/aqdemo

set serveroutput on

create or replace procedure demo_dequeue(p_subscriber varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message RAW(10);
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_subscriber;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('Message: '|| hextoraw(message) );
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
You can dequeue messages with demo_dequeue like below:
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000001
No more messages
6. Enqueue & Dequeue messages

Here is a sequence of enqueue and dequeue of buffered messages:
exec demo_enqueue('00000000000000000002');
exec demo_enqueue('00000000000000000003');

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000002
Message: 00000000000000000003
No more messages

exec demo_enqueue('00000000000000000004');

exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000004
No more messages

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999

select queue_id,
subscriber_name,
subscriber_type,
startup_time,
total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
73821 SUBSCRIBER1 SUBSCRIBER 03-JAN-09 17:55:22 0
73821 SUBSCRIBER2 SUBSCRIBER 03-JAN-09 17:55:22 0
7. Monitor queues and subscribers

You can query the V$BUFFERED_QUEUE and V$BUFFERED_SUBSCRIBERS fixed views to check the number of messages in the queues and how many messages have been dequeued:
connect / as sysdba

alter session set nls_date_format='DD-MON-YY HH24:MI:SS';

col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100

select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:13:12 04-JAN-09 00:17:17 4 0

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999

select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:13:12 4
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:13:12 0
You can dequeue the message from the other subscriber:
exec demo_dequeue('SUBSCRIBER2');

Message: 00000000000000000001
Message: 00000000000000000002
Message: 00000000000000000003
Message: 00000000000000000004
No more messages
8. Illustrate how you can loose messages by crashing an instance

You can easily show that a buffered messages can be lost. Here is a simple scenario to illustrate that point with a bounce of the Oracle instance:
connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000005');

connect / as sysdba
startup force

connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000006');

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000006
No more messages

exec demo_dequeue('SUBSCRIBER2')
Message: 00000000000000000006
No more messages
As you can see above, the message 00000000000000000005 has never been dequeued but is lost; The same scenario illustrates, as expected, that statistics of queue and subscribers usage are reset when an instance is bounced:
connect / as sysdba

alter session set nls_date_format='DD-MON-YY HH24:MI:SS';

col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100

select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:30:03 04-JAN-09 00:33:55 0 0

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999

select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:30:03 1
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:30:03 1
9. Illustrate Flow Control and Spilled Messages

One of the benefit of using buffered messages is that they remain in memory and are not stored on disks. A direct consequence of that is that buffered messages are not logged and as we've seen previously can be lost. The devil being in the details, the memory allocated for the Streams Pool, that store the messages, is not unlimited and compete with other memory pools. To prevent that memory to increase without limit Oracle offers several mechanisms; One is Publisher Flow Control that prevents messages from being enqueued when the messages are not dequeued fast enough. Another one is the ability for messages to spill on disks in the queue table. Here is a simple test case shows these features :
connect aqdemo/aqdemo

begin
for i in 7..100000 loop
demo_enqueue(lpad(to_char(i),20,'0'));
end loop;
end;
/

begin
*
ERROR at line 1:
ORA-25307: Enqueue rate too high, flow control enabled
ORA-06512: at "SYS.DBMS_AQ", line 6
ORA-06512: at "SYS.DBMS_AQ", line 216
ORA-06512: at "AQDEMO.DEMO_ENQUEUE", line 14
ORA-06512: at line 3
If you monitor the content of the queues right after you've enqueued them, you'll see that messages are in the memory associated with the queue:
connect / as sysdba

select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 0

select MSG_STATE
, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;

MSG_STATE COUNT(*)
--------- --------
IN MEMORY 8042

col bytes format 999,999,999
col name format a30
set pages 1000

select name, bytes
from v$sgastat
where pool='streams pool'
order by name;

NAME BYTES
-------------------------- ------------
Sender info 8,484
deqtree_kgqmctx 80
fixed allocation callback 260
free memory 5,702,748
image handles 337,452
kgqbt_alloc_block 99,456
kgqmdm_fl_2 241,288
kgqmsub 144
kodpaih3 image 8,130,416
kwqbcqini:spilledovermsgs 1,968
kwqbdaspl:spilledovermsgs 172,896
kwqbsinfy:bms 305,368
kwqbsinfy:bqg 808
kwqbsinfy:mpr 1,928,180
kwqbsinfy:sta 208
msgtree_kgqmctx 80
name_kgqmsub 32
recov_kgqbtctx 8,192
recov_kgqmctx 616
recov_kgqmsub 336
spilled:kwqbl 2,316
spilled:kwqbm 8,624
substree_kgqmctx 80
time manager index 80
And you'll also see the publisher is "IN FLOW CONTROL":
select publisher_state
from v$buffered_publishers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

PUBLISHER_STATE
-----------------------------------------------------------
IN FLOW CONTROL: INSUFFICIENT MEMORY AND UNBROWSED MESSAGES
If you wait for a few minutes, you'll see messages start to spill on disks:
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 4,021

select MSG_STATE, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;

MSG_STATE COUNT(*)
--------- --------
SPILLED 8042
You can dequeue the messages as you've done already
connect aqdemo/aqdemo

begin
for i in 1..5000 loop
demo_dequeue('SUBSCRIBER1');
demo_dequeue('SUBSCRIBER2');
end loop;
end;
/
10. Clean up the environment

We can conclude this short demonstration of buffered messages by a bit of clean up. I'll dig into more details in the coming posts
connect / as sysdba

drop procedure aqdemo.demo_enqueue;
drop procedure aqdemo.demo_dequeue;

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('subscriber1',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
subscriber := sys.aq$_agent('subscriber2',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
end;
/

begin
dbms_aqadm.stop_queue(
queue_name => 'aqdemo.myqueue',
enqueue => true,
dequeue => true);
end;
/

begin
dbms_aqadm.drop_queue(
queue_name => 'aqdemo.myqueue');
end;
/

begin
dbms_aqadm.drop_queue_table(
queue_table =>'aqdemo.myqueue_table');
end;
/

drop user aqdemo;

drop tablespace aq
including contents and datafiles;

Read more...

Thursday, January 1, 2009

"We Do Streams" is Born

Not sure that's a good idea, by the way...

I've been working with Oracle Streams "almost" since the beginning. Saying that in another way, I've got into a lot of troubles ;-). Now, I feel in the mood to start and maintain a blog dedicated to Oracle Streams, LogMiner, Advanced Queuing and the some of their related features. Times have changed and we can now get some magic and amazing Streams. I wish this blog can contribute to help all of us enter the next level and, why not, wildly spread one of the hottest feature of Oracle Server.

Don't miss the point, using Streams can be challenging: How do you build a reliable Streams-based Architecture? How do you anticipate and quickly fix any coming issue? How do you monitor a Streams environment? How do you make it flexible? Why you shouldn't have used Streams?

But Streams is also the ultimate Swiss knife of the Oracle Database! It allows to perform near-zero downtime upgrade of a database or an application. It can be used to provide high availability, to scale out systems, to share events and messages system-wide. It transforms data on the fly to archive changes or to feed an ODS. And that's only a few use cases...

Do you like the idea? Are you interested? Participate! Leave a comment, write a post or send an email to "wedostreams at gmail.com". If you are looking for immediate help, go to Oracle Streams forum, the Documentation, My Oracle Support (i.e. Metalink; check out 267979.1), Oracle Technology Network, Wikipedia or The Oracle Wiki.

So it's time to start... Happy New Year 2009!
Read more...