-- Create a data type
create or replace type XX_QUEUE_TYPE is object
(
text varchar2(5)
)
-- Queue table
begin
dbms_aqadm.create_queue_table(
queue_table => 'XX_QUEUE_TBL',
queue_payload_type => 'XX_QUEUE_TYPE'
);
end;
-- Select records from the table
select * from XX_QUEUE_TBL;
-- Automatically created view of type aq$*
select * from AQ$XX_QUEUE_TBL;
-- The queue itself
begin
dbms_aqadm.create_queue(
queue_name => 'XX_QUEUE_Q',
queue_table => 'XX_QUEUE_TBL'
);
end;
-- Start the queue
begin
dbms_aqadm.start_queue(
queue_name => 'XX_QUEUE_Q'
);
end;
-- Insert 9 messages
declare
l_msg_id raw(32767);
l_enq_opt dbms_aq.enqueue_options_t;
l_msg_prop dbms_aq.message_properties_t;
begin
for i in 1..9 loop
dbms_aq.enqueue(
queue_name => 'XX_QUEUE_Q',
enqueue_options => l_enq_opt,
message_properties => l_msg_prop,
payload => xx_queue_type('TEST'||to_char(i)),
msgid => l_msg_id
);
end loop;
commit;
end;
-- Check that 9 messages were inserted
select * from AQ$XX_QUEUE_TBL;
-- Retrieve 8 messages
declare
l_msg_id raw(16);
l_deq_opt dbms_aq.dequeue_options_t;
l_msg_prop dbms_aq.message_properties_t;
l_payload xx_queue_type;
begin
for i in 1..8 loop
dbms_aq.dequeue(
queue_name => 'XX_QUEUE_Q',
dequeue_options => l_deq_opt,
message_properties => l_msg_prop,
payload => l_payload,
msgid => l_msg_id
);
dbms_output.put_line('Got a message: '||l_payload.text);
end loop;
commit;
end;
-- Check that 1 message remains
select * from AQ$XX_QUEUE_TBL;
-- Purge the table
declare
po dbms_aqadm.aq$_purge_options_t;
begin
po.block := FALSE;
DBMS_AQADM.PURGE_QUEUE_TABLE(
queue_table => 'XX_QUEUE_TBL',
purge_condition => NULL,
purge_options => po);
end;
-- Check that 0 messages remain
select * from AQ$XX_QUEUE_TBL;
/* Stop the queue,
delete the queue,
delete the table,
delete the type */
begin
dbms_aqadm.stop_queue(
queue_name => 'XX_QUEUE_Q'
);
dbms_aqadm.drop_queue(
queue_name => 'XX_QUEUE_Q'
);
dbms_aqadm.drop_queue_table(
queue_table => 'XX_QUEUE_TBL'
);
execute immediate 'drop type XX_QUEUE_TYPE';
end;