This question refers to the dequeueing of messages in Oracle Streams Advanced Queueing.
I need to ensure that the messages which are related to each other are processed sequentially.
For example, assume the queue is seeded with the four messages that have a business-related field called transaction reference (txn_ref) and two of the messages (1,3) belong to the same transaction (000001):
id | txn_ref |
---+---------+
1 | 000001 |
2 | 000002 |
3 | 000001 |
4 | 000003 |
Assume also that I am running 4 threads/processes that wish to dequeue from this queue. The following should occur:
My initial thought was that I could achieve this with a dequeue condition where the ENQ_TIME (enqueue time) is not later than any other ENQ_TIME of all the messages that have the same TXN_REF. But my problem is how to reference the TXN_REF of a message that I have not yet selected, in order to select it. e.g.
// Java API
String condition = "ENQ_TIME = (select min(ENQ_TIME) from AQ_TABLE1 where ??";
dequeueOption.setCondition(condition);
Is it possible to achieve what I want here?
To answer your direct question, this can be achieved using the correlation
field (called CORRID
in the table), which is designed for this purpose.
So, on the enqueue, you'd use the AQMessageProperties.setCorrelation()
method with the TXN_REF value as the parameter. Then, in your condition you would do something like this:
// Java API
String condition = "tab.ENQ_TIME = (select min(AQ_TABLE1.ENQ_TIME) from AQ_TABLE1 self where tab.CORRID=AQ_TABLE1.CORRID)";
dequeueOption.setCondition(condition);