Bookmark and Share

Sunday, January 4, 2009

Oracle AQ Buffered Queues 101

This post illustrates some of the features of Oracle Advanced Queuing Buffered Messages. For more details, check Oracle® Streams Advanced Queuing User's Guide, DBMS_AQADM, DBMS_AQ, AQ Types or wait for some of the coming posts of this blog ;-). Obviously, we'll be taking some shortcuts for now, but we'll explore basics of buffered messages, like:
  1. Create a user, tablespace and grant the privileges to use Advanced Queuing
  2. Create a queue table, a multiple consumer queue and start it
  3. Create subscribers for the queue
  4. Create a procedure to enqueue buffered messages
  5. Create a procedure to dequeue buffered messages
  6. Enqueue & dequeue messages
  7. Monitor queues and subscribers
  8. Illustrate how you can loose messages by crashing an instance
  9. Illustrate Flow Control and Spilled Message
  10. Clean up the environment
1. Create a user, tablespace and grant the privileges

First of all, we'll create a user, named AQDEMO, to own and use the message queues. In real life, you'll probably separate the queue owner from message publishers and consumers. However, to keep that demo short, we'll use AQDEMO to publish and consume messages too. It's worth to notice that a tablespace that contains queue tables doesn't support Tablespace Point In Time Recovery (TSPITR). For that reason and to ease the management of our AQ environment, we'll store the queues in a separate tablespace called AQ. Last but not least if you want to share queues across multiple databases, you should a use a distributed environment and set the global_names parameter of all the databases to true. The set of command to configure your database looks like below:
connect / as sysdba

alter system
set global_names=true;

select *
from global_name;

GLOBAL_NAME
-----------
BLACK

create tablespace aq
datafile '/u01/app/oracle/oradata/BLACK/aq01.dbf'
size 25M autoextend on maxsize 266M;

create user aqdemo
identified by aqdemo
default tablespace aq
temporary tablespace temp;

grant connect, resource to aqdemo;
grant execute on dbms_aqadm to aqdemo;
grant execute on dbms_aq to aqdemo;

2. Create a queue table, a multiple consumer queue and start it

To perform these steps, we'll connect as AQDEMO and will use:
  • dbms_aqadm.create_queue_table to create the queue table
  • dbms_aqadm.create_queue to create the queue
  • dbms_aqadm.start_queue to start the queue
To keep it simple, we'll won't use a user defined type for the message but just a RAW:
connect aqdemo/aqdemo

begin
dbms_aqadm.create_queue_table(
queue_table =>'myqueue_table',
queue_payload_type =>'RAW',
storage_clause =>'tablespace aq',
multiple_consumers => true,
comment =>'AQ Demo',
compatible=>'11.1');
end;
/

begin
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue(
queue_name => 'myqueue',
enqueue => true,
dequeue => true);
end;
/
Note:
If you plan to create a queue in another schema, you can prefix the queue table or queue name with the schema name like AQDEMO.MYQUEUE_TABLE.
If you want to mix cases, you can surround the name with double-quotes, like "AQDEMO"."myqueue_table" but that's probably easier to use uppercase.
Once the queue table created, several tables and views are used to store and query the associated data; here is an example with AQ$[QUEUE_TABLE]:
desc aq$myqueue_table

Name Null? Type
--------------------- -------- ----------------------------
QUEUE VARCHAR2(30)
MSG_ID NOT NULL RAW(16)
CORR_ID VARCHAR2(128)
MSG_PRIORITY NUMBER
MSG_STATE VARCHAR2(16)
DELAY DATE
DELAY_TIMESTAMP TIMESTAMP(6)
EXPIRATION NUMBER
ENQ_TIME DATE
ENQ_TIMESTAMP TIMESTAMP(6)
ENQ_USER_ID VARCHAR2(30)
ENQ_TXN_ID VARCHAR2(30)
DEQ_TIME DATE
DEQ_TIMESTAMP TIMESTAMP(9)
DEQ_USER_ID VARCHAR2(30)
DEQ_TXN_ID VARCHAR2(30)
RETRY_COUNT NUMBER
EXCEPTION_QUEUE_OWNER VARCHAR2(30)
EXCEPTION_QUEUE VARCHAR2(30)
USER_DATA BLOB
PROPAGATED_MSGID RAW(16)
SENDER_NAME VARCHAR2(30)
SENDER_ADDRESS VARCHAR2(1024)
SENDER_PROTOCOL NUMBER
ORIGINAL_MSGID RAW(16)
ORIGINAL_QUEUE_NAME VARCHAR2(30)
ORIGINAL_QUEUE_OWNER VARCHAR2(30)
EXPIRATION_REASON VARCHAR2(31)
CONSUMER_NAME VARCHAR2(30)
ADDRESS VARCHAR2(1024)
PROTOCOL NUMBER
3. Create subscribers for the queue

In the case of Publish/Subscribe, you have to create subscribers to dequeue messages. In our example, we'll register 2 subscribers called SUBSCRIBER1 and SUBSCRIBER2 for that purpose:
connect aqdemo/aqdemo

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber1',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,
delivery_mode => dbms_aqadm.buffered);
end;
/

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber2',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,
delivery_mode => dbms_aqadm.buffered);
end;
/
You can check queue subscribers by querying the AQ$MYQUEUE_TABLE_S view (or AQ$_MYQUEUE_TABLE_S table) like below:
col name    format a12
col address format a40

select name, address
from aq$myqueue_table_s
where queue='MYQUEUE';

NAME ADDRESS
------------ -------------------------
SUBSCRIBER1
SUBSCRIBER2
4. Create a procedure to enqueue buffered messages

The procedure below can be used to enqueue buffered messages; The options and properties used to enqueue the messages are the following:
  • The message is a RAW(10) passed from the procedure parameter as an hexadecimal string
  • The list of message recipients are SUBSCRIBER1 and SUBSCRIBER2
  • The message is a buffered message (dbms_aq.buffered)
connect aqdemo/aqdemo

create or replace procedure demo_enqueue(p_hexamsg varchar2) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
message RAW(10);
begin
message := hextoraw(p_hexamsg);
recipients(1) := sys.aq$_agent('SUBSCRIBER1', 'MYQUEUE', NULL);
recipients(2) := sys.aq$_agent('SUBSCRIBER2', 'MYQUEUE', NULL);
message_properties.recipient_list := recipients;
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
commit;
end;
/
Once the procedure created, you can easily enqueue messages from the AQDEMO user:
exec demo_enqueue('00000000000000000001');
5. Create a procedure to dequeue buffered messages

The procedure below dequeues the buffered messages and display it with DBMS_OUTPUT.PUT_LINE; it takes the subscriber name as a parameter
connect aqdemo/aqdemo

set serveroutput on

create or replace procedure demo_dequeue(p_subscriber varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message RAW(10);
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_subscriber;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('Message: '|| hextoraw(message) );
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
You can dequeue messages with demo_dequeue like below:
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000001
No more messages
6. Enqueue & Dequeue messages

Here is a sequence of enqueue and dequeue of buffered messages:
exec demo_enqueue('00000000000000000002');
exec demo_enqueue('00000000000000000003');

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000002
Message: 00000000000000000003
No more messages

exec demo_enqueue('00000000000000000004');

exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000004
No more messages

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999

select queue_id,
subscriber_name,
subscriber_type,
startup_time,
total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
73821 SUBSCRIBER1 SUBSCRIBER 03-JAN-09 17:55:22 0
73821 SUBSCRIBER2 SUBSCRIBER 03-JAN-09 17:55:22 0
7. Monitor queues and subscribers

You can query the V$BUFFERED_QUEUE and V$BUFFERED_SUBSCRIBERS fixed views to check the number of messages in the queues and how many messages have been dequeued:
connect / as sysdba

alter session set nls_date_format='DD-MON-YY HH24:MI:SS';

col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100

select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:13:12 04-JAN-09 00:17:17 4 0

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999

select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:13:12 4
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:13:12 0
You can dequeue the message from the other subscriber:
exec demo_dequeue('SUBSCRIBER2');

Message: 00000000000000000001
Message: 00000000000000000002
Message: 00000000000000000003
Message: 00000000000000000004
No more messages
8. Illustrate how you can loose messages by crashing an instance

You can easily show that a buffered messages can be lost. Here is a simple scenario to illustrate that point with a bounce of the Oracle instance:
connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000005');

connect / as sysdba
startup force

connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000006');

set serveroutput on
exec demo_dequeue('SUBSCRIBER1');

Message: 00000000000000000006
No more messages

exec demo_dequeue('SUBSCRIBER2')
Message: 00000000000000000006
No more messages
As you can see above, the message 00000000000000000005 has never been dequeued but is lost; The same scenario illustrates, as expected, that statistics of queue and subscribers usage are reset when an instance is bounced:
connect / as sysdba

alter session set nls_date_format='DD-MON-YY HH24:MI:SS';

col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100

select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:30:03 04-JAN-09 00:33:55 0 0

set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999

select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:30:03 1
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:30:03 1
9. Illustrate Flow Control and Spilled Messages

One of the benefit of using buffered messages is that they remain in memory and are not stored on disks. A direct consequence of that is that buffered messages are not logged and as we've seen previously can be lost. The devil being in the details, the memory allocated for the Streams Pool, that store the messages, is not unlimited and compete with other memory pools. To prevent that memory to increase without limit Oracle offers several mechanisms; One is Publisher Flow Control that prevents messages from being enqueued when the messages are not dequeued fast enough. Another one is the ability for messages to spill on disks in the queue table. Here is a simple test case shows these features :
connect aqdemo/aqdemo

begin
for i in 7..100000 loop
demo_enqueue(lpad(to_char(i),20,'0'));
end loop;
end;
/

begin
*
ERROR at line 1:
ORA-25307: Enqueue rate too high, flow control enabled
ORA-06512: at "SYS.DBMS_AQ", line 6
ORA-06512: at "SYS.DBMS_AQ", line 216
ORA-06512: at "AQDEMO.DEMO_ENQUEUE", line 14
ORA-06512: at line 3
If you monitor the content of the queues right after you've enqueued them, you'll see that messages are in the memory associated with the queue:
connect / as sysdba

select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 0

select MSG_STATE
, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;

MSG_STATE COUNT(*)
--------- --------
IN MEMORY 8042

col bytes format 999,999,999
col name format a30
set pages 1000

select name, bytes
from v$sgastat
where pool='streams pool'
order by name;

NAME BYTES
-------------------------- ------------
Sender info 8,484
deqtree_kgqmctx 80
fixed allocation callback 260
free memory 5,702,748
image handles 337,452
kgqbt_alloc_block 99,456
kgqmdm_fl_2 241,288
kgqmsub 144
kodpaih3 image 8,130,416
kwqbcqini:spilledovermsgs 1,968
kwqbdaspl:spilledovermsgs 172,896
kwqbsinfy:bms 305,368
kwqbsinfy:bqg 808
kwqbsinfy:mpr 1,928,180
kwqbsinfy:sta 208
msgtree_kgqmctx 80
name_kgqmsub 32
recov_kgqbtctx 8,192
recov_kgqmctx 616
recov_kgqmsub 336
spilled:kwqbl 2,316
spilled:kwqbm 8,624
substree_kgqmctx 80
time manager index 80
And you'll also see the publisher is "IN FLOW CONTROL":
select publisher_state
from v$buffered_publishers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

PUBLISHER_STATE
-----------------------------------------------------------
IN FLOW CONTROL: INSUFFICIENT MEMORY AND UNBROWSED MESSAGES
If you wait for a few minutes, you'll see messages start to spill on disks:
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';

QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 4,021

select MSG_STATE, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;

MSG_STATE COUNT(*)
--------- --------
SPILLED 8042
You can dequeue the messages as you've done already
connect aqdemo/aqdemo

begin
for i in 1..5000 loop
demo_dequeue('SUBSCRIBER1');
demo_dequeue('SUBSCRIBER2');
end loop;
end;
/
10. Clean up the environment

We can conclude this short demonstration of buffered messages by a bit of clean up. I'll dig into more details in the coming posts
connect / as sysdba

drop procedure aqdemo.demo_enqueue;
drop procedure aqdemo.demo_dequeue;

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('subscriber1',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
subscriber := sys.aq$_agent('subscriber2',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
end;
/

begin
dbms_aqadm.stop_queue(
queue_name => 'aqdemo.myqueue',
enqueue => true,
dequeue => true);
end;
/

begin
dbms_aqadm.drop_queue(
queue_name => 'aqdemo.myqueue');
end;
/

begin
dbms_aqadm.drop_queue_table(
queue_table =>'aqdemo.myqueue_table');
end;
/

drop user aqdemo;

drop tablespace aq
including contents and datafiles;

4 comments:

  1. Excelent article ! Very usefull for me.
    Tnx !

    ReplyDelete
  2. Good work!! thanks for your efforts

    ReplyDelete
  3. This is a great blog and has been an invaluable resource in getting up to speed with Oracle Streams and Advance Queues.

    Perhaps some tips and tricks in getting Oracle 11g R2 RAC with SCAN working with AQ is in order. I'm following all the documents in regards to local_listener and remote_listener, but I'm not able to make buffered messages reliable. Random failures of
    "ORA-25306: Cannot connect to buffered queue's owner instance ORA-06512" still plague my implementation.

    ReplyDelete
  4. Yes, I've been experiencing the same intermittent failures.

    Any luck fixing this problem?

    ReplyDelete