- Create a user, tablespace and grant the privileges to use Advanced Queuing
- Create a queue table, a multiple consumer queue and start it
- Create subscribers for the queue
- Create a procedure to enqueue buffered messages
- Create a procedure to dequeue buffered messages
- Enqueue & dequeue messages
- Monitor queues and subscribers
- Illustrate how you can loose messages by crashing an instance
- Illustrate Flow Control and Spilled Message
- Clean up the environment
First of all, we'll create a user, named
AQDEMO
, to own and use the message queues. In real life, you'll probably separate the queue owner from message publishers and consumers. However, to keep that demo short, we'll use AQDEMO
to publish and consume messages too. It's worth to notice that a tablespace that contains queue tables doesn't support Tablespace Point In Time Recovery (TSPITR). For that reason and to ease the management of our AQ environment, we'll store the queues in a separate tablespace called AQ
. Last but not least if you want to share queues across multiple databases, you should a use a distributed environment and set the global_names
parameter of all the databases to true. The set of command to configure your database looks like below:connect / as sysdba
alter system
set global_names=true;
select *
from global_name;
GLOBAL_NAME
-----------
BLACK
create tablespace aq
datafile '/u01/app/oracle/oradata/BLACK/aq01.dbf'
size 25M autoextend on maxsize 266M;
create user aqdemo
identified by aqdemo
default tablespace aq
temporary tablespace temp;
grant connect, resource to aqdemo;
grant execute on dbms_aqadm to aqdemo;
grant execute on dbms_aq to aqdemo;
2. Create a queue table, a multiple consumer queue and start it
To perform these steps, we'll connect as
AQDEMO
and will use:dbms_aqadm.create_queue_table
to create the queue tabledbms_aqadm.create_queue
to create the queuedbms_aqadm.start_queue
to start the queue
RAW
:connect aqdemo/aqdemo
begin
dbms_aqadm.create_queue_table(
queue_table =>'myqueue_table',
queue_payload_type =>'RAW',
storage_clause =>'tablespace aq',
multiple_consumers => true,
comment =>'AQ Demo',
compatible=>'11.1');
end;
/
begin
dbms_aqadm.create_queue(
queue_name => 'myqueue',
queue_table => 'myqueue_table');
end;
/
begin
dbms_aqadm.start_queue(
queue_name => 'myqueue',
enqueue => true,
dequeue => true);
end;
/
Note:Once the queue table created, several tables and views are used to store and query the associated data; here is an example with AQ$[QUEUE_TABLE]:
If you plan to create a queue in another schema, you can prefix the queue table or queue name with the schema name likeAQDEMO.MYQUEUE_TABLE
.
If you want to mix cases, you can surround the name with double-quotes, like"AQDEMO"."myqueue_table"
but that's probably easier to use uppercase.
desc aq$myqueue_table3. Create subscribers for the queue
Name Null? Type
--------------------- -------- ----------------------------
QUEUE VARCHAR2(30)
MSG_ID NOT NULL RAW(16)
CORR_ID VARCHAR2(128)
MSG_PRIORITY NUMBER
MSG_STATE VARCHAR2(16)
DELAY DATE
DELAY_TIMESTAMP TIMESTAMP(6)
EXPIRATION NUMBER
ENQ_TIME DATE
ENQ_TIMESTAMP TIMESTAMP(6)
ENQ_USER_ID VARCHAR2(30)
ENQ_TXN_ID VARCHAR2(30)
DEQ_TIME DATE
DEQ_TIMESTAMP TIMESTAMP(9)
DEQ_USER_ID VARCHAR2(30)
DEQ_TXN_ID VARCHAR2(30)
RETRY_COUNT NUMBER
EXCEPTION_QUEUE_OWNER VARCHAR2(30)
EXCEPTION_QUEUE VARCHAR2(30)
USER_DATA BLOB
PROPAGATED_MSGID RAW(16)
SENDER_NAME VARCHAR2(30)
SENDER_ADDRESS VARCHAR2(1024)
SENDER_PROTOCOL NUMBER
ORIGINAL_MSGID RAW(16)
ORIGINAL_QUEUE_NAME VARCHAR2(30)
ORIGINAL_QUEUE_OWNER VARCHAR2(30)
EXPIRATION_REASON VARCHAR2(31)
CONSUMER_NAME VARCHAR2(30)
ADDRESS VARCHAR2(1024)
PROTOCOL NUMBER
In the case of Publish/Subscribe, you have to create subscribers to dequeue messages. In our example, we'll register 2 subscribers called
SUBSCRIBER1
and SUBSCRIBER2
for that purpose:connect aqdemo/aqdemoYou can check queue subscribers by querying the
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber1',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,delivery_mode =>
dbms_aqadm.buffered);
end;
/
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent(
'subscriber2',
'myqueue',
null);
dbms_aqadm.add_subscriber(
queue_name => 'myqueue',
subscriber => subscriber,delivery_mode =>
dbms_aqadm.buffered);
end;
/
AQ$MYQUEUE_TABLE_S
view (or AQ$_MYQUEUE_TABLE_S
table) like below:col name format a124. Create a procedure to enqueue buffered messages
col address format a40
select name, address
from aq$myqueue_table_s
where queue='MYQUEUE';
NAME ADDRESS
------------ -------------------------
SUBSCRIBER1
SUBSCRIBER2
The procedure below can be used to enqueue buffered messages; The options and properties used to enqueue the messages are the following:
- The message is a RAW(10) passed from the procedure parameter as an hexadecimal string
- The list of message recipients are
SUBSCRIBER1
andSUBSCRIBER2
- The message is a buffered message (
dbms_aq.buffered
)
connect aqdemo/aqdemoOnce the procedure created, you can easily enqueue messages from the
create or replace procedure demo_enqueue(p_hexamsg varchar2) is
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
recipients DBMS_AQ.aq$_recipient_list_t;
message_handle RAW(16);
message RAW(10);
begin
message := hextoraw(p_hexamsg);
recipients(1) := sys.aq$_agent('SUBSCRIBER1', 'MYQUEUE', NULL);
recipients(2) := sys.aq$_agent('SUBSCRIBER2', 'MYQUEUE', NULL);
message_properties.recipient_list := recipients;
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
queue_name => 'MYQUEUE',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
commit;
end;
/
AQDEMO
user:exec demo_enqueue('00000000000000000001');5. Create a procedure to dequeue buffered messages
The procedure below dequeues the buffered messages and display it with
DBMS_OUTPUT.PUT_LINE
; it takes the subscriber name as a parameterconnect aqdemo/aqdemoYou can dequeue messages with
set serveroutput on
create or replace procedure demo_dequeue(p_subscriber varchar2)
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message RAW(10);
no_messages exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait := dbms_aq.no_wait;
dequeue_options.consumer_name := p_subscriber;
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.visibility := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
queue_name => 'myqueue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line('Message: '|| hextoraw(message) );
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
dbms_output.put_line('No more messages');
commit;
end;
/
demo_dequeue
like below:set serveroutput on6. Enqueue & Dequeue messages
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000001
No more messages
Here is a sequence of enqueue and dequeue of buffered messages:
exec demo_enqueue('00000000000000000002');7. Monitor queues and subscribers
exec demo_enqueue('00000000000000000003');
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000002
Message: 00000000000000000003
No more messages
exec demo_enqueue('00000000000000000004');
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000004
No more messages
set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999
select queue_id,
subscriber_name,
subscriber_type,
startup_time,
total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
73821 SUBSCRIBER1 SUBSCRIBER 03-JAN-09 17:55:22 0
73821 SUBSCRIBER2 SUBSCRIBER 03-JAN-09 17:55:22 0
You can query the
V$BUFFERED_QUEUE
and V$BUFFERED_SUBSCRIBERS
fixed views to check the number of messages in the queues and how many messages have been dequeued:connect / as sysdbaYou can dequeue the message from the other subscriber:
alter session set nls_date_format='DD-MON-YY HH24:MI:SS';
col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:13:12 04-JAN-09 00:17:17 4 0
set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999
select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:13:12 4
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:13:12 0
exec demo_dequeue('SUBSCRIBER2');8. Illustrate how you can loose messages by crashing an instance
Message: 00000000000000000001
Message: 00000000000000000002
Message: 00000000000000000003
Message: 00000000000000000004
No more messages
You can easily show that a buffered messages can be lost. Here is a simple scenario to illustrate that point with a bounce of the Oracle instance:
connect aqdemo/aqdemoAs you can see above, the message 00000000000000000005 has never been dequeued but is lost; The same scenario illustrates, as expected, that statistics of queue and subscribers usage are reset when an instance is bounced:
exec demo_enqueue('00000000000000000005');
connect / as sysdba
startup force
connect aqdemo/aqdemo
exec demo_enqueue('00000000000000000006');
set serveroutput on
exec demo_dequeue('SUBSCRIBER1');
Message: 00000000000000000006
No more messages
exec demo_dequeue('SUBSCRIBER2')
Message: 00000000000000000006
No more messages
connect / as sysdba9. Illustrate Flow Control and Spilled Messages
alter session set nls_date_format='DD-MON-YY HH24:MI:SS';
col queue_id format 999999
col startup_time format a18
col num_msgs format 999,999
col spill_msgs format 999,999
set lines 100
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ ------------------ -------- ----------
74026 04-JAN-09 00:30:03 04-JAN-09 00:33:55 0 0
set lines 100
col subscriber_name format a15
col subscriber_type format a15
col startup_time format a18
col total_dequeued_msg format 999,999
select queue_id
, subscriber_name
, subscriber_type
, startup_time
, total_dequeued_msg
from v$buffered_subscribers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID SUBSCRIBER_NAME SUBSCRIBER_TYPE STARTUP_TIME TOTAL_DEQUEUED_MSG
-------- --------------- --------------- ------------------ ------------------
74026 SUBSCRIBER1 SUBSCRIBER 04-JAN-09 00:30:03 1
74026 SUBSCRIBER2 SUBSCRIBER 04-JAN-09 00:30:03 1
One of the benefit of using buffered messages is that they remain in memory and are not stored on disks. A direct consequence of that is that buffered messages are not logged and as we've seen previously can be lost. The devil being in the details, the memory allocated for the Streams Pool, that store the messages, is not unlimited and compete with other memory pools. To prevent that memory to increase without limit Oracle offers several mechanisms; One is Publisher Flow Control that prevents messages from being enqueued when the messages are not dequeued fast enough. Another one is the ability for messages to spill on disks in the queue table. Here is a simple test case shows these features :
connect aqdemo/aqdemoIf you monitor the content of the queues right after you've enqueued them, you'll see that messages are in the memory associated with the queue:
begin
for i in 7..100000 loop
demo_enqueue(lpad(to_char(i),20,'0'));
end loop;
end;
/
begin
*
ERROR at line 1:
ORA-25307: Enqueue rate too high, flow control enabled
ORA-06512: at "SYS.DBMS_AQ", line 6
ORA-06512: at "SYS.DBMS_AQ", line 216
ORA-06512: at "AQDEMO.DEMO_ENQUEUE", line 14
ORA-06512: at line 3
connect / as sysdbaAnd you'll also see the publisher is "IN FLOW CONTROL":
select queue_id
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 0
select MSG_STATE
, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;
MSG_STATE COUNT(*)
--------- --------
IN MEMORY 8042
col bytes format 999,999,999
col name format a30
set pages 1000
select name, bytes
from v$sgastat
where pool='streams pool'
order by name;
NAME BYTES
-------------------------- ------------
Sender info 8,484
deqtree_kgqmctx 80
fixed allocation callback 260
free memory 5,702,748
image handles 337,452
kgqbt_alloc_block 99,456
kgqmdm_fl_2 241,288
kgqmsub 144
kodpaih3 image 8,130,416
kwqbcqini:spilledovermsgs 1,968
kwqbdaspl:spilledovermsgs 172,896
kwqbsinfy:bms 305,368
kwqbsinfy:bqg 808
kwqbsinfy:mpr 1,928,180
kwqbsinfy:sta 208
msgtree_kgqmctx 80
name_kgqmsub 32
recov_kgqbtctx 8,192
recov_kgqmctx 616
recov_kgqmsub 336
spilled:kwqbl 2,316
spilled:kwqbm 8,624
substree_kgqmctx 80
time manager index 80
select publisher_stateIf you wait for a few minutes, you'll see messages start to spill on disks:
from v$buffered_publishers
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
PUBLISHER_STATE
-----------------------------------------------------------
IN FLOW CONTROL: INSUFFICIENT MEMORY AND UNBROWSED MESSAGES
select queue_idYou can dequeue the messages as you've done already
, startup_time
, sysdate
, num_msgs
, spill_msgs
from v$buffered_queues
where queue_schema='AQDEMO'
and queue_name='MYQUEUE';
QUEUE_ID STARTUP_TIME SYSDATE NUM_MSGS SPILL_MSGS
-------- ------------------ --------- -------- ----------
74026 04-JAN-09 04-JAN-09 4,021 4,021
select MSG_STATE, count(*)
from aqdemo.aq$myqueue_table
where queue='MYQUEUE'
group by MSG_STATE;
MSG_STATE COUNT(*)
--------- --------
SPILLED 8042
connect aqdemo/aqdemo10. Clean up the environment
begin
for i in 1..5000 loop
demo_dequeue('SUBSCRIBER1');
demo_dequeue('SUBSCRIBER2');
end loop;
end;
/
We can conclude this short demonstration of buffered messages by a bit of clean up. I'll dig into more details in the coming posts
connect / as sysdba
drop procedure aqdemo.demo_enqueue;
drop procedure aqdemo.demo_dequeue;
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('subscriber1',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
subscriber := sys.aq$_agent('subscriber2',
'aqdemo.myqueue',
null);
dbms_aqadm.remove_subscriber(
queue_name => 'aqdemo.myqueue',
subscriber => subscriber);
end;
/
begin
dbms_aqadm.stop_queue(
queue_name => 'aqdemo.myqueue',
enqueue => true,
dequeue => true);
end;
/
begin
dbms_aqadm.drop_queue(
queue_name => 'aqdemo.myqueue');
end;
/
begin
dbms_aqadm.drop_queue_table(
queue_table =>'aqdemo.myqueue_table');
end;
/
drop user aqdemo;
drop tablespace aq
including contents and datafiles;
Excelent article ! Very usefull for me.
ReplyDeleteTnx !
Good work!! thanks for your efforts
ReplyDeleteThis is a great blog and has been an invaluable resource in getting up to speed with Oracle Streams and Advance Queues.
ReplyDeletePerhaps some tips and tricks in getting Oracle 11g R2 RAC with SCAN working with AQ is in order. I'm following all the documents in regards to local_listener and remote_listener, but I'm not able to make buffered messages reliable. Random failures of
"ORA-25306: Cannot connect to buffered queue's owner instance ORA-06512" still plague my implementation.
Yes, I've been experiencing the same intermittent failures.
ReplyDeleteAny luck fixing this problem?