latency
parameter on propagation performance. But that's another story...I strongly invite you to read the original post. It shows interesting things like "queue paused in flow control", "spilled messages" or how to loose buffered messages. Nevertheless, the previous example uses only one table. The one below exchanges messages between 2 queues in different databases. You'll see how to setup subscribers and schedule propagation to share your messages. To install it, assuming you have 2 databases 11.1 (or very likely 10.2), follow these steps:
- Set Up the environnement to use SQL*Plus and to connect as SYSDBA
- Create a DEMO user and a User Defined Type in the 2 databases
- Create a database link between the source and the destination databases
- Create and start queues
- Create a subscribers on the source queue and schedule propagation to the destination queue
- Create an enqueue procedure in the source database
- Create a dequeue procedure in the destination database
- Enqueue a message on one end and dequeue it on the other end
- Clean Up the environment
Set Up the environnement to use SQL*Plus and to connect as SYSDBA
To begin, you'll set the SQL*Plus variables that follow:- sourcegdb defines the global_name AND the network alias of the source database. We assume
global_names=true
, even if that's not strictly mandatory. It helps to make things more readable. Make sure you have the aliases setup everywhere and they match the database global names. - destgdb defines the global_name AND the network alias of the destination database.
- source_user and dest_user define names of
SYSDBA
users on the source and on the destination databases. - source_pwd and dest_pwd define the passwords of the corresponding source_user and dest_user.
accept sourcegdb default 'BLACK' -
prompt "Enter the Source Database Global Name [BLACK]: "
accept destgdb default 'WHITE' -
prompt "Enter the Destination Database Global Name [WHITE]: "
accept source_user default 'sys' -
prompt "Enter the Destination SYSDBA user [sys]: "
accept source_pwd default 'change_on_install' -
prompt "Enter the Destination SYSDBA password [change_on_install]: "
accept dest_user default 'sys' -
prompt "Enter the Destination SYSDBA user [sys]: "
accept dest_pwd default 'change_on_install' -
prompt "Enter the Destination SYSDBA password [change_on_install]: "
Create a User DEMO and a User Defined Type in the 2 databases
To go on with the example, you'll need to create theDEMO
user in the 2 databases and create a type you'll use to send and receive messages. The script below creates those users and types.Note:
For this example, we assume there is no user namedDEMO
. We also assume the 2 tablespacesUSERS
andTEMP
exist in the databases. There is no need for any of the database to run in ARCHIVELOG mode.
connect &&source_user/&&source_pwd@&&sourcegdb as sysdba
create user demo
identified by demo
default tablespace users
temporary tablespace temp;
grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;
connect &&dest_user/&&dest_pwd@&&destgdb as sysdba
create user demo
identified by demo
default tablespace users
temporary tablespace temp;
grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;
connect demo/demo@&&sourcegdb
create type mytype as object (
id number
, field1 varchar2(4000)
, field2 varchar2(4000));
/
connect demo/demo@&&destgdb
create type mytype as object (
id number
, field1 varchar2(4000)
, field2 varchar2(4000));
/
Create a database link between the source and the destination databases
To start the propagation job, you'll need the source database to connect to the destination database. Create and test a database link for that purpose:connect demo/demo@&&sourcegdb
create database link &&destgdb
connect to demo
identified by demo
using '&&destgdb';
select * from dual@&&destgdb;
Create and start queues
Then, create and start the queues:connect demo/demo@&&destgdb
begin
dbms_aqadm.create_queue_table(
'myqueue_table'
, 'mytype'
, multiple_consumers => true);
end;
/
begin
dbms_aqadm.create_queue(
'myqueue'
, 'myqueue_table');
end;
/
begin
dbms_aqadm.start_queue('myqueue');
end;
/
connect demo/demo@&&sourcegdb
begin
dbms_aqadm.create_queue_table(
'myqueue_table'
, 'mytype'
, multiple_consumers => true);
end;
/
begin
dbms_aqadm.create_queue(
'myqueue'
, 'myqueue_table');
end;
/
begin
dbms_aqadm.start_queue('myqueue');
end;
/
Create a subscribers on the source queue and schedule propagation to the destination queue
The next step consists in adding the subscribers that match the destination queue, to the source queue. In this example, we add 2 subscribers because we will eventually dequeue the messages from 2 separate programs (or for 2 distinct purposes). Once done, check the queues are compatibles and schedule the QUEUE to QUEUE propagation.dba_queue_schedules
provides detailed informations about what is scheduled:connect demo/demo@&&sourcegdb
begin
dbms_aqadm.add_subscriber(
queue_name => 'myqueue'
, subscriber => sys.aq$_agent('RED','demo.myqueue@&&destgdb',null)
, queue_to_queue => true
, delivery_mode => dbms_aqadm.buffered);
end;
/
begin
dbms_aqadm.add_subscriber(
queue_name => 'myqueue'
, subscriber => sys.aq$_agent('BLUE','demo.myqueue@&&destgdb',null)
, queue_to_queue => true
, delivery_mode => dbms_aqadm.buffered);
end;
/
set serveroutput on
declare
rc binary_integer;
begin
dbms_aqadm.verify_queue_types(
src_queue_name => 'myqueue'
, dest_queue_name => 'demo.myqueue'
, destination => '&&destgdb'
, rc => rc);
dbms_output.put_line('If result is 1, it''s OKAY: '||rc);
end;
/
begin
dbms_aqadm.schedule_propagation(
queue_name => 'myqueue'
, destination => '&&destgdb'
, destination_queue => 'demo.myqueue');
end;
/
set pages 1000
select schema
, qname
, destination
, start_time
, latency
, schedule_disabled
, session_id
, total_number
, failures
, last_error_msg
, message_delivery_mode
from dba_queue_schedules;
Create an enqueue procedure in the source database
Create an enqueue procedure demo_enqueue, that enqueues a message in the buffered part of the queue:connect demo/demo@&&sourcegdb
create or replace procedure demo_enqueue(p_mytype mytype) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => p_mytype,
msgid => message_handle);
commit;
end;
/
Create a dequeue procedure in the destination database
demo_dequeue
dequeues messages from the destination queue based on the consumer name:connect demo/demo@&&destgdb
select * from aq$myqueue_table_S;
set serveroutput on
create or replace procedure demo_dequeue(p_consumer varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
v_mytype mytype;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_consumer;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => v_mytype,
msgid => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('Message for Consumer "'||p_consumer||'": ');
dbms_output.put_line('ID :'||to_char(v_mytype.id));
dbms_output.put_line('FIELD1:'||v_mytype.field1);
dbms_output.put_line('FIELD2:'||v_mytype.field2);
dbms_output.put_line('---------------------------------------------------------');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
Enqueue a message on one end and dequeue it on the other end
You are ready to test your case. Enqueue a message and check you get the messages for the 2 subscribers:connect demo/demo@&&sourcegdb
set serveroutput on
declare
v_mytype mytype;
begin
v_mytype := mytype(1, 'BLUE AND RED','Red And Blue');
demo_enqueue(v_mytype);
end;
/
select * from aq$myqueue_table;
connect demo/demo@&&destgdb
select * from aq$myqueue_table;
set serveroutput on
exec demo_dequeue('BLUE')
exec demo_dequeue('RED')
Note:aq$myqueue_table
help to monitor the messages. The propagation has been set without any time means that the messages are always sent from the source to the destination; thelatency
(default to 60 in that case), is the only thing that can slightly impact the time needed for the message to be available for the consumers
Clean Up the environment
You are done! Before you leave, suppress the AQ propagation schedule, the queues and theDEMO
users:connect &&source_user/&&source_pwd@&&sourcegdb as sysdba
begin
dbms_aqadm.UNSCHEDULE_PROPAGATION(
queue_name => 'demo.myqueue'
, destination => '&&destgdb'
, destination_queue => 'DEMO.MYQUEUE');
end;
/
exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)
drop user demo cascade;
select * from dba_queue_schedules;
connect &&dest_user/&&dest_pwd@&&destgdb as sysdba
exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)
drop user demo cascade;
Invaluable help! I've been looking for an example like this for months!!!
ReplyDeleteI can't beleive I've finally found an example that really works!!!
Thank you!
Daniela.
And me for a reader ;-). No problem, this blog is intended to share/help
ReplyDeleteI'm still studying this but THANK YOU!
ReplyDeleteA good blog/discussion/post would be to discuss the differences between all the AQ packages & why you chose to use DBMS_AQADM instead of the others.
-JD, Oakland CA
This is excellent script, its simple and ease for execution
ReplyDelete