Oracle AQ implementation

Oracle Advanced Queuing (AQ) is a queue management mechanism. In the simplest case, a type is created, and based on this type, a queue table is created using a special API. The queue operates on a FIFO (First-In-First-Out) principle. The listing provided below demonstrates the above explanation.
-- Create a data type
create or replace type XX_QUEUE_TYPE is object
(
  text varchar2(5)
)

-- Queue table
begin
  dbms_aqadm.create_queue_table(
    queue_table        => 'XX_QUEUE_TBL',
    queue_payload_type => 'XX_QUEUE_TYPE'
  );
end;

-- Select records from the table
select * from XX_QUEUE_TBL;
-- Automatically created view of type aq$*
select * from AQ$XX_QUEUE_TBL;

-- The queue itself
begin
  dbms_aqadm.create_queue(
    queue_name  => 'XX_QUEUE_Q',
    queue_table => 'XX_QUEUE_TBL'
  );
end;

-- Start the queue
begin
  dbms_aqadm.start_queue(
    queue_name  => 'XX_QUEUE_Q'
  );
end;

-- Insert 9 messages
declare
  l_msg_id raw(32767);
  l_enq_opt dbms_aq.enqueue_options_t;
  l_msg_prop dbms_aq.message_properties_t;
begin
  for i in 1..9 loop
    dbms_aq.enqueue(
      queue_name         => 'XX_QUEUE_Q',
      enqueue_options    => l_enq_opt,
      message_properties => l_msg_prop,
      payload            => xx_queue_type('TEST'||to_char(i)),
      msgid              => l_msg_id
    );
  end loop;
  commit;
end;

-- Check that 9 messages were inserted
select * from AQ$XX_QUEUE_TBL;

-- Retrieve 8 messages
declare
  l_msg_id raw(16);
  l_deq_opt dbms_aq.dequeue_options_t;
  l_msg_prop dbms_aq.message_properties_t;
  l_payload xx_queue_type;
begin
  for i in 1..8 loop
    dbms_aq.dequeue(
      queue_name         => 'XX_QUEUE_Q',
      dequeue_options    => l_deq_opt,
      message_properties => l_msg_prop,
      payload            => l_payload,
      msgid              => l_msg_id
    );
    dbms_output.put_line('Got a message: '||l_payload.text);
  end loop;
  commit;
end;

-- Check that 1 message remains
select * from AQ$XX_QUEUE_TBL;

-- Purge the table
declare
   po dbms_aqadm.aq$_purge_options_t;
begin
   po.block := FALSE;
   DBMS_AQADM.PURGE_QUEUE_TABLE(
     queue_table     => 'XX_QUEUE_TBL',
     purge_condition => NULL,
     purge_options   => po);
end;

-- Check that 0 messages remain
select * from AQ$XX_QUEUE_TBL;
 
/* Stop the queue,
   delete the queue,
   delete the table,
   delete the type */       
begin
  dbms_aqadm.stop_queue(
       queue_name  => 'XX_QUEUE_Q'
       );
  dbms_aqadm.drop_queue(
       queue_name  => 'XX_QUEUE_Q'
       );     
  dbms_aqadm.drop_queue_table(
       queue_table => 'XX_QUEUE_TBL'
       );
  execute immediate 'drop type XX_QUEUE_TYPE';
end;
Create the queue on Java:
public static void singleCunsumerAQTest() throws ClassNotFoundException {
 // vars
 Connection db_conn;
 AQSession aq_sess;
 AQQueueTableProperty     qtable_prop;
 AQQueueProperty          queue_prop;
 AQQueueTable             q_table;
 AQQueue                  queue;

Class.forName("oracle.jdbc.driver.OracleDriver");
// connect to DB
try{                                         
    Connection connection =
    db_conn = DriverManager.getConnection("jdbc:oracle:thin:@SERV_URL:SERV_PORT:SID", "login", "pass");
    connection.setAutoCommit(false);
    System.out.println("Successfully connected.");
   
    //Load the Oracle AQ driver:
    Class.forName("oracle.AQ.AQOracleDriver");
    // create AQ session
    aq_sess = AQDriverManager.createAQSession(db_conn);
    System.out.println("Successfully created AQSession."); 
   
    /* Creating a AQQueueTableProperty object (payload type - RAW): */
    qtable_prop = new AQQueueTableProperty("RAW");
    // single consumer
    qtable_prop.setMultiConsumer(false);
      
    /* Creating a queue table called aq_table1 in aqjava schema: */
    q_table = aq_sess.createQueueTable("APPS", "XX_QUEUE_TBL", qtable_prop);
    System.out.println("Successfully created XX_QUEUE_TBL in APPS schema"); 

    /* Creating a new AQQueueProperty object */
    queue_prop = new AQQueueProperty();
      
    /* Creating a queue called aq_queue1 in aq_table1 */
    queue = aq_sess.createQueue (q_table, "XX_QUEUE_Q", queue_prop);
    System.out.println("Successfully created XX_QUEUE_Q in XX_QUEUE_TBL"); 
}catch (Exception ex)
  {
     System.out.println("Exception: " + ex);
     ex.printStackTrace();
  }
}
ceo@thomsonslegacy.com

We customize Oracle e-Business Suite R12 modules with Oracle Applications Framework and providing Oracle ADF based solutions


© Thomsonslegacy.com, 2025.
If you publish the materials of the site, the reference to the source is obligatory. The site uses cookies.