Bookmark and Share

Wednesday, August 26, 2009

Oracle AQ Buffered Queues 101 (Part 2)

I would like to get back to the Oracle AQ Buffered Queues 101 post, I've written a few months back. What you'll find below is an enhanced example, that may better suit your needs. As a matter of fact, this is something I've recently used to study the impact of Oracle*NET SDU, buffer size and the latency parameter on propagation performance. But that's another story...

I strongly invite you to read the original post. It shows interesting things like "queue paused in flow control", "spilled messages" or how to loose buffered messages. Nevertheless, the previous example uses only one table. The one below exchanges messages between 2 queues in different databases. You'll see how to setup subscribers and schedule propagation to share your messages. To install it, assuming you have 2 databases 11.1 (or very likely 10.2), follow these steps:You can start right away...

Set Up the environnement to use SQL*Plus and to connect as SYSDBA

To begin, you'll set the SQL*Plus variables that follow:
  • sourcegdb defines the global_name AND the network alias of the source database. We assume global_names=true, even if that's not strictly mandatory. It helps to make things more readable. Make sure you have the aliases setup everywhere and they match the database global names.
  • destgdb defines the global_name AND the network alias of the destination database.
  • source_user and dest_user define names of SYSDBA users on the source and on the destination databases.
  • source_pwd and dest_pwd define the passwords of the corresponding source_user and dest_user.
accept sourcegdb default 'BLACK' -
prompt "Enter the Source Database Global Name [BLACK]: "

accept destgdb default 'WHITE' -
prompt "Enter the Destination Database Global Name [WHITE]: "

accept source_user default 'sys' -
prompt "Enter the Destination SYSDBA user [sys]: "

accept source_pwd default 'change_on_install' -
prompt "Enter the Destination SYSDBA password [change_on_install]: "

accept dest_user default 'sys' -
prompt "Enter the Destination SYSDBA user [sys]: "

accept dest_pwd default 'change_on_install' -
prompt "Enter the Destination SYSDBA password [change_on_install]: "

Create a User DEMO and a User Defined Type in the 2 databases

To go on with the example, you'll need to create the DEMO user in the 2 databases and create a type you'll use to send and receive messages. The script below creates those users and types.
Note:
For this example, we assume there is no user named DEMO. We also assume the 2 tablespaces USERS and TEMP exist in the databases. There is no need for any of the database to run in ARCHIVELOG mode.

connect &&source_user/&&source_pwd@&&sourcegdb as sysdba

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;

connect &&dest_user/&&dest_pwd@&&destgdb as sysdba

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;

connect demo/demo@&&sourcegdb

create type mytype as object (
id number
, field1 varchar2(4000)
, field2 varchar2(4000));
/

connect demo/demo@&&destgdb

create type mytype as object (
id number
, field1 varchar2(4000)
, field2 varchar2(4000));
/

Create a database link between the source and the destination databases

To start the propagation job, you'll need the source database to connect to the destination database. Create and test a database link for that purpose:
connect demo/demo@&&sourcegdb

create database link &&destgdb
connect to demo
identified by demo
using '&&destgdb';

select * from dual@&&destgdb;

Create and start queues

Then, create and start the queues:
connect demo/demo@&&destgdb

begin
dbms_aqadm.create_queue_table(
'myqueue_table'
, 'mytype'
, multiple_consumers => true);
end;
/

begin
dbms_aqadm.create_queue(
'myqueue'
, 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue('myqueue');
end;
/

connect demo/demo@&&sourcegdb

begin
dbms_aqadm.create_queue_table(
'myqueue_table'
, 'mytype'
, multiple_consumers => true);
end;
/

begin
dbms_aqadm.create_queue(
'myqueue'
, 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue('myqueue');
end;
/

Create a subscribers on the source queue and schedule propagation to the destination queue

The next step consists in adding the subscribers that match the destination queue, to the source queue. In this example, we add 2 subscribers because we will eventually dequeue the messages from 2 separate programs (or for 2 distinct purposes). Once done, check the queues are compatibles and schedule the QUEUE to QUEUE propagation. dba_queue_schedules provides detailed informations about what is scheduled:
connect demo/demo@&&sourcegdb

begin
dbms_aqadm.add_subscriber(
queue_name => 'myqueue'
, subscriber => sys.aq$_agent('RED','demo.myqueue@&&destgdb',null)
, queue_to_queue => true
, delivery_mode => dbms_aqadm.buffered);
end;
/

begin
dbms_aqadm.add_subscriber(
queue_name => 'myqueue'
, subscriber => sys.aq$_agent('BLUE','demo.myqueue@&&destgdb',null)
, queue_to_queue => true
, delivery_mode => dbms_aqadm.buffered);
end;
/

set serveroutput on

declare
rc binary_integer;
begin
dbms_aqadm.verify_queue_types(
src_queue_name => 'myqueue'
, dest_queue_name => 'demo.myqueue'
, destination => '&&destgdb'
, rc => rc);
dbms_output.put_line('If result is 1, it''s OKAY: '||rc);
end;
/

begin
dbms_aqadm.schedule_propagation(
queue_name => 'myqueue'
, destination => '&&destgdb'
, destination_queue => 'demo.myqueue');
end;
/

set pages 1000
select schema
, qname
, destination
, start_time
, latency
, schedule_disabled
, session_id
, total_number
, failures
, last_error_msg
, message_delivery_mode
from dba_queue_schedules;

Create an enqueue procedure in the source database

Create an enqueue procedure demo_enqueue, that enqueues a message in the buffered part of the queue:
connect demo/demo@&&sourcegdb

create or replace procedure demo_enqueue(p_mytype mytype) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => p_mytype,
msgid => message_handle);
commit;
end;
/

Create a dequeue procedure in the destination database

demo_dequeue dequeues messages from the destination queue based on the consumer name:
connect demo/demo@&&destgdb

select * from aq$myqueue_table_S;

set serveroutput on

create or replace procedure demo_dequeue(p_consumer varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
v_mytype mytype;
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_consumer;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => v_mytype,
msgid => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('Message for Consumer "'||p_consumer||'": ');
dbms_output.put_line('ID :'||to_char(v_mytype.id));
dbms_output.put_line('FIELD1:'||v_mytype.field1);
dbms_output.put_line('FIELD2:'||v_mytype.field2);
dbms_output.put_line('---------------------------------------------------------');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/

Enqueue a message on one end and dequeue it on the other end

You are ready to test your case. Enqueue a message and check you get the messages for the 2 subscribers:
connect demo/demo@&&sourcegdb
set serveroutput on
declare
v_mytype mytype;
begin
v_mytype := mytype(1, 'BLUE AND RED','Red And Blue');
demo_enqueue(v_mytype);
end;
/

select * from aq$myqueue_table;


connect demo/demo@&&destgdb

select * from aq$myqueue_table;

set serveroutput on
exec demo_dequeue('BLUE')
exec demo_dequeue('RED')
Note:
aq$myqueue_table help to monitor the messages. The propagation has been set without any time means that the messages are always sent from the source to the destination; the latency (default to 60 in that case), is the only thing that can slightly impact the time needed for the message to be available for the consumers

Clean Up the environment

You are done! Before you leave, suppress the AQ propagation schedule, the queues and the DEMO users:
connect &&source_user/&&source_pwd@&&sourcegdb as sysdba

begin
dbms_aqadm.UNSCHEDULE_PROPAGATION(
queue_name => 'demo.myqueue'
, destination => '&&destgdb'
, destination_queue => 'DEMO.MYQUEUE');
end;
/

exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)

drop user demo cascade;

select * from dba_queue_schedules;

connect &&dest_user/&&dest_pwd@&&destgdb as sysdba

exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)

drop user demo cascade;

4 comments:

  1. Invaluable help! I've been looking for an example like this for months!!!
    I can't beleive I've finally found an example that really works!!!
    Thank you!
    Daniela.

    ReplyDelete
  2. And me for a reader ;-). No problem, this blog is intended to share/help

    ReplyDelete
  3. I'm still studying this but THANK YOU!
    A good blog/discussion/post would be to discuss the differences between all the AQ packages & why you chose to use DBMS_AQADM instead of the others.
    -JD, Oakland CA

    ReplyDelete
  4. This is excellent script, its simple and ease for execution

    ReplyDelete