Bookmark and Share

Monday, March 16, 2009

Adding A Destination To A Streams Configuration

In the first post of this blog, Oracle Streams One Way Table Replication 101, I've described how to setup one of the simplest Streams replication possible: SOURCE.T1 DML changes are captured and applied to DESTINATION.T1; There is just 1 database and no propagation. Now, we'll transform this example so that the changes are also directed into another queue, in the same database, and applied to DESTINATION2.T1. This new example will allow us to test some of the new features of Oracle Streams 11g; I'll show you that in the coming weeks...

It's worth to note that the scenario I've used to add the new destination, can impact the primary destination replication. There is a way to prevent this issue if that's a concern for you: you can created a specific capture for the time of the instantiation and merge it with the existing capture once the instantiation done. In the current example, there is only one table and no real changes. Leaving the existing configuration as much in sync as possible was not my primary goal. For this reason, I've chosen a more direct method made of the steps below: Before we start, we assume we have a Streams replication setup as described in Oracle Streams One Way Table Replication 101, steps 1 to 9!

Create a Streams queue in the destination database

As I've already told you, I use a second queue. For this reason, the first step of the setup consists in creating that second queue; In order to proceed, I've used the dbms_streams_adm.set_up_queue procedure like below:
connect strmadmin/strmadmin

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table2',
queue_name => 'strmadmin.streams_queue2');
end;
/

Create a propagation from streams_queue to streams_queue2

Once the queue created, I create a queue-to-queue propagation for the DML changes on source.t1; I propagate the messages in the same database so I don't use any database link:
begin
dbms_streams_adm.add_table_propagation_rules(
table_name => 'source.t1',
streams_name => 'streams_propagation',
source_queue_name => 'strmadmin.streams_queue',
destination_queue_name => 'strmadmin.streams_queue2',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'BLACK',
inclusion_rule => true,
queue_to_queue => true);
end;
/

col propagation_name format a19
col source_queue_name format a13
col destination_queue_name format a14
col destination_dblink format a9
col status format a10

select propagation_name,
source_queue_name,
destination_queue_name,
destination_dblink,
status
from dba_propagation;

PROPAGATION_NAME SOURCE_QUEUE_ DESTINATION_QU DESTINATI STATUS
------------------- ------------- -------------- --------- ----------
STREAMS_PROPAGATION STREAMS_QUEUE STREAMS_QUEUE2 AQ$_LOCAL ENABLED

Create an apply process for the new destination

Then, I create the apply process that subscribes to the new queue for the changes made to source.t1:
begin
dbms_streams_adm.add_table_rules(
table_name => 'source.t1',
streams_type => 'apply',
streams_name => 'streams_apply2',
queue_name => 'strmadmin.streams_queue2',
include_dml => true,
include_ddl => false,
include_tagged_lcr => false,
source_database => 'BLACK',
inclusion_rule => true);
end;
/

col apply_name format a14
col queue_name format a14
col rule_set_name format a11

select apply_name,
queue_name,
rule_set_name,
status,
message_delivery_mode
from dba_apply;

APPLY_NAME QUEUE_NAME RULE_SET_NA STATUS MESSAGE_DE
-------------- -------------- ----------- ---------- ----------
STREAMS_APPLY STREAMS_QUEUE RULESET$_9 ENABLED CAPTURED
STREAMS_APPLY2 STREAMS_QUEUE2 RULESET$_16 DISABLED CAPTURED

Add a Transformation to rename the schema on the Apply rule

Like in the previous example, I need to rename the schema. I use a declarative transformation to rename source to destination2:
col rule_owner format a9
col streams_name format a14
col table_owner format a6
col streams_type format a5
col rule_type format a3
col table_name format a5
col rule_name format a5 new_value rulename

select rule_owner,
streams_name,
streams_type,
table_owner,
table_name,
rule_type,
rule_name
from dba_streams_table_rules
where streams_name='STREAMS_APPLY2'
and streams_type='APPLY'
and rule_type='DML';

RULE_OWNE STREAMS_NAME STREA TABLE_ TABLE RUL RULE_
--------- -------------- ----- ------ ----- --- -----
STRMADMIN STREAMS_APPLY2 APPLY SOURCE T1 DML T115

prompt &&rulename
T115

begin
dbms_streams_adm.rename_schema(
rule_name => '&&rulename' ,
from_schema_name => 'SOURCE',
to_schema_name => 'DESTINATION2',
step_number => 0,
operation => 'add');
end;
/

col rule_name format A6
col from_schema_name format a6
col to_schema_name format a12

select rule_name,
transform_type,
from_schema_name,
to_schema_name,
declarative_type
from dba_streams_transformations;

RULE_N TRANSFORM_TYPE FROM_S TO_SCHEMA_NA DECLARATIVE_T
------ -------------------------- ------ ------------ -------------
T110 DECLARATIVE TRANSFORMATION SOURCE DESTINATION RENAME SCHEMA
T115 DECLARATIVE TRANSFORMATION SOURCE DESTINATION2 RENAME SCHEMA

Insure there is no pending transaction on the source

At that point, that's important to make sure there is no uncommited changes made on the source database before the apply was created. To check that's the case, we run dbms_capture_adm.prepare_xxxx_instantiation that holds a share look on all the source objects. We need to run that command AFTER we create the apply process! We'll instantiate the objects at a SCN that is after that procedure execution:
connect / as sysdba

exec dbms_capture_adm.prepare_table_instantiation('source.t1');

Instantiate the new destination

I can now create the new schema and table to instantiate them:
create user destination2
identified by destination2
default tablespace users
temporary tablespace temp;

grant connect,resource to destination2;

create table destination2.t1(
id number primary key,
text varchar2(80));

col apply_scn format 999999999999 new_value instantiation_scn

select dbms_flashback.get_system_change_number apply_scn
from dual;

APPLY_SCN
---------
795716


prompt &&instantiation_scn
795716

insert into destination2.t1
(select * from source.t1 as of scn &&instantiation_scn);

commit;
Once the table instantiated, I store the instantiation SCN in the database so that the new apply process only applies changes committed after that instantiation SCN. Before we proceed, we have to make sure the change we'll make won't affect the existing apply process. That's because we use the same database for both apply. To make sure that's the case, we'll verify the applied_message_number from dba_apply_progress is greater than the instantiation SCN we'll set.
Note
If there is no changes on the source database, there is no reason for applied_message_number to change either. For this reason you can run an update on one of the source object and commit the change to make sure that number can increase.
select applied_message_number
from dba_apply_progress;

APPLIED_MESSAGE_NUMBER
----------------------
796912
0
begin
dbms_apply_adm.set_table_instantiation_scn(
source_object_name => 'source.t1',
source_database_name => 'BLACK',
instantiation_scn => &&instantiation_scn);
end;
/

Start the new Apply Process

Once the configuration finished, I could start the new apply process; the propagation is enabled by default:
exec dbms_apply_adm.start_apply('streams_apply2')

col propagation_name format a19
col status format a10

select propagation_name,
status
from dba_propagation;

PROPAGATION_NAME STATUS
------------------- ----------
STREAMS_PROPAGATION ENABLED

Test the replication

And I've tested the replication:
insert into source.t1 values (4,'Text 4');

1 row created.

commit

col id format 99
col text format a6

select id,
text
from destination.t1;

ID TEXT
--- ------
4 Text 4
1 Text 1
2 Text 2
3 Text 3

select id,
text
from destination2.t1;

ID TEXT
--- ------
4 Text 4
1 Text 1
2 Text 2
3 Text 3

delete from source.t1 where id=4;

commit;

Reset the Configuration

Like for the other scenarios, you'll find below the script to reset the configuration to what it was before this post:
exec dbms_propagation_adm.stop_propagation('streams_propagation');
exec dbms_propagation_adm.drop_propagation('streams_propagation', true);

exec dbms_apply_adm.stop_apply('streams_apply2')
exec dbms_apply_adm.drop_apply('streams_apply2',true);

begin
dbms_streams_adm.remove_queue(
queue_name => 'strmadmin.streams_queue2',
cascade => false,
drop_unused_queue_table => true);
end;
/

select queue_table
from dba_queue_tables
where owner='STRMADMIN';

QUEUE_TABLE
-------------------
STREAMS_QUEUE_TABLE

drop user destination2 cascade;

2 comments:

  1. I'm not sure I got it - you create different and separate capture and apply scripts for every table you want to replicate?

    ReplyDelete
  2. No. This example is about one table but they could be several, it could a whole schema, many or even global. Actually it doesn't matter.

    What it shows is how to build a second destination/apply from the same capture.

    ReplyDelete