oracle-databaseadvanced-queuingoracle-aqoracle-streams

Selective dequeue of unrelated messages in Oracle Advanced Queueing


This question refers to the dequeueing of messages in Oracle Streams Advanced Queueing.

I need to ensure that the messages which are related to each other are processed sequentially.

For example, assume the queue is seeded with the four messages that have a business-related field called transaction reference (txn_ref) and two of the messages (1,3) belong to the same transaction (000001):

id | txn_ref | 
---+---------+
 1 | 000001  |
 2 | 000002  |
 3 | 000001  |
 4 | 000003  |

Assume also that I am running 4 threads/processes that wish to dequeue from this queue. The following should occur:

  1. thread 1 dequeues message #1
  2. thread 2 dequeues message #2
  3. thread 3 dequeues message #4 (because message #3 is related to #1 and #1 has not yet completed).
  4. thread 4 blocks waiting for a message
  5. thread 1 commits its work for message #1
  6. thread 4 (or perhaps thread 1) dequeues message #3.

My initial thought was that I could achieve this with a dequeue condition where the ENQ_TIME (enqueue time) is not later than any other ENQ_TIME of all the messages that have the same TXN_REF. But my problem is how to reference the TXN_REF of a message that I have not yet selected, in order to select it. e.g.

// Java API
String condition = "ENQ_TIME = (select min(ENQ_TIME) from AQ_TABLE1 where ??";
dequeueOption.setCondition(condition);

Is it possible to achieve what I want here?


Solution

  • To answer your direct question, this can be achieved using the correlation field (called CORRID in the table), which is designed for this purpose.

    So, on the enqueue, you'd use the AQMessageProperties.setCorrelation() method with the TXN_REF value as the parameter. Then, in your condition you would do something like this:

    // Java API
    String condition = "tab.ENQ_TIME = (select min(AQ_TABLE1.ENQ_TIME) from AQ_TABLE1 self where tab.CORRID=AQ_TABLE1.CORRID)";
    dequeueOption.setCondition(condition);